diff --git a/.npmignore b/.npmignore index 5278f4a..aee4eb9 100644 --- a/.npmignore +++ b/.npmignore @@ -1,5 +1,5 @@ index.js.map -Gruntfile.js +gulpfile.js .npmignore .idea/ .vscode/ diff --git a/Gruntfile.js b/Gruntfile.js deleted file mode 100644 index a7060da..0000000 --- a/Gruntfile.js +++ /dev/null @@ -1,35 +0,0 @@ -module.exports = function (grunt) { - - require("load-grunt-tasks")(grunt); - - grunt.initConfig({ - typescript: { - index: { - src: ['ts/*.ts'], - dest: 'index.js', - options: { - module: 'commonjs', - sourceMap: true, - rootDir: "ts/", - comments: true, - target: "ES5" - } - } - }, - - clean: { - typescript: ["index.js", "index.js.map"] - } - }); - - grunt.registerTask("build", [ - "newer:typescript" - ]); - - grunt.registerTask("rebuild", [ - "clean", - "build" - ]); - - grunt.registerTask("default", ["build"]); -}; \ No newline at end of file diff --git a/README.md b/README.md index 3106705..a56e229 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ The nodejs sdk for aliyun mns service -[阿里云消息服务中间件-简体中文-帮助手册](https://github.com/InCar/ali-mns/blob/master/README.zh-Hans.md) +[阿里云消息服务-简体中文-帮助手册](https://github.com/InCar/ali-mns/blob/master/README.zh-Hans.md) Ali MNS service is a MQ(message queue) service provided by AliYun. The world largest online sales website www.taobao.com is heavily relying on it. @@ -24,6 +24,7 @@ Use 'npm install ali-mns' to install the package. // send message mq.sendP("Hello ali-mns").then(console.log, console.error); ``` +More sample codes can be found in [GitHub](https://github.com/InCar/ali-mns/tree/master/test). # Promised The ali-mns use the [promise](https://www.npmjs.org/package/promise) pattern. @@ -37,9 +38,234 @@ Visit [http://www.typescriptlang.org/](http://www.typescriptlang.org/) for more If you interest in source file, visit GitHub [https://github.com/InCar/ali-mns](https://github.com/InCar/ali-mns) -Please use 'grunt' to compile ts files into a single index.js file after downloading source files. +Please use 'gulp' to compile ts files into a single index.js file after downloading source files. # API Reference + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
CLASSMETHODDESCRIPTION
[Account](#accountaccountidstring-keyidstring-keysecretstring)The *Account* class store your ali account information.
[getAccountId](#accountgetaccountid)Return the ali account id.
[getOwnerId](#accountgetownerid)Same as account.getAccountId(). For compatible v1.x.
[getKeyId](#accountgetkeyid)Return the ali key id.
[MNS](#mnsaccountaccount-regionstring)
[MQS](#mqsaccountaccount-regionstring)
[MNSTopic](#mnstopicaccountaccount-regionstring)
Operate the mns queue. The *MQS* is for compatible v1.x.
[listP](#mnslistpprefixstring-pagesizenumber-pagemarkerstring)List all of the queue in a data center.
[createP](#mnscreatepnamestring-optionsany)Create a mq.
[deleteP](#mnsdeletepnamestring)Delete an mq.
[MQ](#mqnamestring-accountaccount-regionstring)
[MQBatch](#mqbatch)
The *MQ* operate the message in a queue.
[getName](#mqgetname)Gets the name of mq.
[getAccount](#mqgetaccount)Gets the account of mq.
[getRegion](#mqgetregion)Gets the region of mq.
[sendP](#mqsendpmsgstring-prioritynumber-delaysecondsnumber)Send a message to the queue.
[getRecvTolerance](#mqgetrecvtolerance--mqsetrecvtolerancevaluenumber)Gets the tolerance seconds for mq.recvP method.
[setRecvTolerance](#mqgetrecvtolerance--mqsetrecvtolerancevaluenumber)Sets the tolerance seconds for mq.recvP method.
[recvP](#mqrecvpwaitsecondsnumber)Receive a message from queue.
[peekP](#mqpeekp)Peek a message.
[deleteP](#mqdeletepreceipthandlestring)Delete a message from queue.
[reserveP](#mqreservepreceipthandlestring-reservesecondsnumber)Reserve a received message.
[notifyRecv](#mqnotifyrecvcbexerror-msganyboolean-waitsecondsnumber)Register a callback function to receive messages.
[notifyStopP](#mqnotifystopp)Stop mq.notifyRecv working.
[getAttrsP](#mqgetattrsp)Get the attributes of the mq.
[setAttrsP](#mqsetattrspoptionsany)Modify the attributes of mq.
[MQBatch](#mqbatch)Provide the batch process model introduced in a new edtion of Ali-MNS service in June, 2015.
[sendP](#mqbatchsendpmsgstring--array-prioritynumber-delaysecondsnumber)Send a message or batch send messages to the queue.
[recvP](#mqbatchrecvpwaitsecondsnumber-numofmessagesnumber)Receive a message or batch receive messages from queue.
[peekP](#mqbatchpeekpnumofmessagesnumber)Peek message(s).
[deleteP](#mqbatchdeletepreceipthandlestring--array)Delete a message or messages from queue.
[notifyRecv](#mqbatchnotifyrecvcbexerror-msganyboolean-waitsecondsnumber-numofmessagesnumber)Register a callback function to receive messages in batch
[Msg](#msgmsg-string-prioritynumber-delaysecondsnumber)A simple message define, used in MQBatch.
[getMsg](#msggetmsg)Return the content of message.
[getPriority](#msggetpriority)Return the priority of message.
[getDelaySeconds](#msggetdelayseconds)Return the delay seconds of message.
[MNSTopic](#mnstopicaccountaccount-regionstring)The class MNSTopic extends class MNS for providing features in topic model.
[listTopicP](#mnslisttopicpprefixstring-pagesizenumber-pagemarkerstring)List all topics.
[createTopicP](#mnscreatetopicpnamestring-optionsany)Create a topic.
[deleteTopicP](#mnsdeletetopicpnamestring)Delete a topic.
[Topic](#topicnamestring-accountaccount-regionstring)Operate a topic.
[getName](#topicgetname)Get topic name.
[getAccount](#topicgetaccount)Get topic account.
[getRegion](#topicgetregion)Get topic region.
[getAttrsP](#topicgetattrsp--topicsetattrspoptionsany)Get attributes of topic.
[setAttrsP](#topicgetattrsp--topicsetattrspoptionsany)Set attributes of topic.
[listP](#topiclistpprefixstring-pagesizenumber-pagemarkerstring)List all subscriptions.
[subscribeP](#topicsubscribepnamestring-endpointstring-notifystrategystring-notifycontentformatstring)Subscribe a topic.
[unsubscribeP](#topicunsubscribepnamestring)Unsubscribe a topic.
[publishP](#topicpublishpmsgstring-b64boolean)Publish a message to a topic.
[Subscription](#subscriptionnamestring-topictopic)Operate a subscription.
[getName](#subscriptiongetname)Get name of subscription.
[getTopic](#subscriptiongettopic)Get topic of subscription.
[getAttrsP](#subscriptiongetattrsp--subscriptionsetattrspoptionsany)Get attributes of subscription.
[setAttrsP](#subscriptiongetattrsp--subscriptionsetattrspoptionsany)Set attributes of subscription.
[NotifyStrategy](#subscriptionnotifystrategy)NotifyStrategy constant.
[NotifyContentFormat](#subscriptionnotifycontentformat)NotifyContentFormat constant
+ + ## Account(accountId:string, keyId:string, keySecret:string) The *Account* class store your ali account information. Construct an account object is simple: @@ -149,13 +375,13 @@ Default is "hangzhou". It can also be internal address "hangzhou-internal", "bei var mq = new AliMNS.MQ("myAliMQ", account, "hangzhou"); ``` -## getName() +## mq.getName() Gets the name of mq. -## getAccount() +## mq.getAccount() Gets the account of mq. -## getRegion() +## mq.getRegion() Gets the region of mq. ## mq.sendP(msg:string, priority?:number, delaySeconds?:number) @@ -171,7 +397,7 @@ This argument is prior to the options.DelaySeconds in attributes of message queu mq.sendP("Hello Ali-MNS", 8, 0).then(console.log, console.error); ``` -## getRecvTolerance() & setRecvTolerance(value:number) +## mq.getRecvTolerance() & mq.setRecvTolerance(value:number) Gets or sets the tolerance seconds for mq.recvP method. value: number. Default is 5, in seconds. How long will mq.recvP wait before timeout. @@ -373,6 +599,170 @@ numOfMessages: number. optional. The max number of message can be received in a All other arguments are same as *mq.notifyRecv*. +# MNSTopic(account:Account, region?:string) +The class `MNSTopic` extends class `MNS` for providing features in topic model. +All methods in `MNS` class are also available in `MNSTopic`. +```javascript + var AliMNS = require("ali-mns"); + var account = new AliMNS.Account("", "", ""); + var mns = new AliMNS.MNSTopic(account, "shenzhen"); +``` +*By now(Apr. 2016), the topic model is only provided in shenzhen data center.* + +## mns.listTopicP(prefix?:string, pageSize?:number, pageMarker?:string) +List all topics. + +prefix: String, optional. Return only topics with the prefix. + +pageSize: number, optional. How many topics will be returned in a page, 1~1000, default is 1000. + +pageMarker: String, optional. Request the next page, the value is returned in last call. + +## mns.createTopicP(name:string, options?:any) +Create a topic. + +name: topic name. + +options: optional. + +options.MaximumMessageSize: int. The maximum size of message, 1024(1k)~65536(64k), default is 65536. + +options.LoggingEnabled: boolean. Enable logging or not, default is false. + +## mns.deleteTopicP(name:string) +Delete a topic. + +name: topic name. + +# Topic(name:string, account:Account, region?:string) +Operate a topic. + +name: topic name. + +account: An account object. + +region: optional. Can be "shenzhen" or "shenzhen-internal", default is "hangzhou". + +*By now(Apr. 2016), the topic model is only provided in shenzhen data center* +```javascript +var AliMNS = require("ali-mns"); +var account = new AliMNS.Account("", "", ""); +var topic = new AliMNS.Topic("t11", account, "shenzhen"); +``` + +## topic.getName() +Get topic name. + +## topic.getAccount() +Get topic account. + +## topic.getRegion() +Get topic region. + +## topic.getAttrsP() & topic.setAttrsP(options:any) +Get or set attributes of topic. + +options: topic attributes. + +options.MaximumMessageSize: int. The maximum size of message, 1024(1k)~65536(64k), default is 65536. + +options.LoggingEnabled: boolean. Enable logging or not, default is false. + +```javascript +topic.setAttrsP({ MaximumMessageSize: 1024 }); +topic.getAttrsP().then((data)=>{ console.info(data); }); +``` + +## topic.listP(prefix?:string, pageSize?:number, pageMarker?:string) +List all subscriptions. + +prefix: String, optional. Return only subscriptions with the prefix. + +pageSize: number, optional. How many subscriptions will be returned in a page, 1~1000, default is 1000. + +pageMarker: String, optional. Request the next page, the value is returned in last call. + +## topic.subscribeP(name:string, endPoint:string, notifyStrategy?:string, notifyContentFormat?:string) +Subscribe a topic. + +name: Name of subscription. + +endPoint: Notify end point. eg. `http://www.yoursite.com/mns-ep` + +notifyStrategy: optional. BACKOFF_RETRY or EXPONENTIAL_DECAY_RETRY, default is BACKOFF_RETRY. + +notifyContentFormat: optional. XML or SIMPLIFIED, default is XML. + +```javascript +topic.subscribeP("subx", "http://www.yoursite.com/mns-ep", + AliMNS.Subscription.NotifyStrategy.BACKOFF_RETRY, + AliMNS.Subscription.NotifyContentFormat.SIMPLIFIED) + .then( + (data)=>{ console.info(data);}, + (err)=>{ console.error(err); } + ); +``` + +## topic.unsubscribeP(name:string) +Unsubscribe a topic. + +name: Name of subscription. + +## topic.publishP(msg:string, b64:boolean) +Publish a message to a topic. + +msg: content of message + +b64: true, encoding msg to base64 format before publishing. +false, do not encoding msg before publishing. + +If message contains Chinese characters, must set `b64` to `true`. +Only very simple message can set `b64` to `false`. + +# Subscription(name:string, topic:Topic) +Operate a subscription. +```javascript +var AliMNS = require("ali-mns"); +var account = new AliMNS.Account("", "", ""); +var topic = new AliMNS.Topic("t11", account, "shenzhen"); +var subscription = new AliMNS.Subscription("s12", topic); +``` + +## subscription.getName() +Get name of subscription. + +## subscription.getTopic() +Get topic of subscription. + + +## subscription.getAttrsP() & subscription.setAttrsP(options:any) +Get or set attributes of subscription. + +options: attributes of subscription. + +options.NotifyStrategy: BACKOFF_RETRY or EXPONENTIAL_DECAY_RETRY. +```javascript +subscription.setAttrsP({ NotifyStrategy: AliMNS.Subscription.NotifyStrategy.EXPONENTIAL_DECAY_RETRY }); +``` + +## Subscription.NotifyStrategy +Contains 2 const string. + +AliMNS.Subscription.NotifyStrategy.BACKOFF_RETRY : "BACKOFF_RETRY" + +AliMNS.Subscription.NotifyStrategy.EXPONENTIAL_DECAY_RETRY : "EXPONENTIAL_DECAY_RETRY" + +[More about NotifyStrategy[zh-Hans]](https://help.aliyun.com/document_detail/mns/api_reference/concepts/NotifyStrategy.html?spm=5176.docmns/api_reference/topic_api_spec/subscription_operation.6.141.tmwb5L) + +## Subscription.NotifyContentFormat +Contains 2 const string. + +AliMNS.Subscription.NotifyContentFormat.XML : "XML" + +AliMNS.Subscription.NotifyContentFormat.SIMPLIFIED : "SIMPLIFIED" + +[More about NotifyContentFormat[zh-Hans]](https://help.aliyun.com/document_detail/mns/api_reference/concepts/NotifyContentFormat.html?spm=5176.docmns/api_reference/concepts/NotifyStrategy.6.142.kWiFyy) + # DEBUG Trace Set the environment variable **DEBUG** to "ali-mns" to enable the debug trace output. ```SHELL @@ -434,7 +824,7 @@ It is about **10 times slower** in serial mode than in batch mode. The testing code is in [$/test/performance.js](https://github.com/InCar/ali-mns/blob/master/test/performance.js) and a test log sample is in [$/test/performance.log](https://github.com/InCar/ali-mns/blob/master/test/performance.log) -Needs [mocha](https://www.npmjs.com/package/mocha) module to run the test. +Use `npm run test` to execute the test. Set environment variable **DEBUG** to **ali-mns.test** to turn on output trace(will slow down the test). diff --git a/README.zh-Hans.md b/README.zh-Hans.md index 96f7faa..c6d5197 100644 --- a/README.zh-Hans.md +++ b/README.zh-Hans.md @@ -1,439 +1,826 @@ -# ali-mns (ali-mqs) -[![npm version](https://badge.fury.io/js/ali-mns.svg)](http://badge.fury.io/js/ali-mns) -[![npm version](https://badge.fury.io/js/ali-mqs.svg)](http://badge.fury.io/js/ali-mqs) - -阿里云消息服务(MNS)nodejs软开发包 - -[AliYun-MNS.en-US.README](https://github.com/InCar/ali-mns/blob/master/README.md) - -阿里云消息服务是由阿里云提供的一种消息队列服务中间件. -淘宝网www.taobao.com本身也使用了这种技术. - -访问阿里云消息服务的官方网站 [http://www.aliyun.com/product/mns](http://www.aliyun.com/product/mns) 以了解更多有关阿里云消息服务的详情. - -2015年6月,阿里云使用了新名称Ali-MSN替代了旧的Ali-MQS. -了解如何从旧的版本升级,请访问 [Migrate](#migrate). - -# 快速开始 -使用'npm install ali-mns'来进行安装. - -```javascript - var AliMNS = require("ali-mns"); - var account = new AliMNS.Account("", "", ""); - var mq = new AliMNS.MQ("", account, "hangzhou"); - // send message - mq.sendP("Hello ali-mns").then(console.log, console.error); -``` - -# Promised -ali-mns使用 [promise](https://www.npmjs.org/package/promise) 模式. -所有后缀'P'的方法都会返回一个promise对象. - -# Typescript -如果仅仅只是打算使用它,放心忽略本小节内容. - -绝大多数源代码都用typescript写成,访问 [http://www.typescriptlang.org/](http://www.typescriptlang.org/) 获取更多typescript的知识. - -如果你对源代码感兴趣,访问GitHub [https://github.com/InCar/ali-mns](https://github.com/InCar/ali-mns) - -克隆源代后,使用`grunt`来编译.ts文件. - -# API参考 -## Account(accountId:string, keyId:string, keySecret:string) -*Account*类用于存储你的阿里云帐号信息.创建一个帐号对象很简单: - -accountId: String, 阿里云帐号id. - -keyId: String, 阿里云钥id. - -keySecret: String, 阿里云密钥. -```javascript - var AliMNS = require("ali-mns"); - var account = new AliMNS.Account("", "", ""); -``` -帐号对象通常作为参数被传递给其它类的对象如*MNS*, *MQ* - -猛戳[这里](https://ak-console.aliyun.com/#/accesskey)找到你的阿里云帐号. - -## account.getAccountId() -返回阿里云帐号id. - -## account.getOwnerId() -和account.getAccountId()功能相同. 为了向下兼容v1.x版本. - -## account.getKeyId() -返回阿里钥id. - -## MNS(account:Account, region?:string) -*MNS*类用于操作mns队列. - -account: 阿里云帐号对象. - -region: String, optional. 可能的取值为"hangzhou", "beijing" or "qingdao",分别代表阿里云提供消息服务的3个数据中心. -缺省为"hangzhou".也可以是带有"-internal"后缀的内网形式,如"hangzhou-internal", "beijing-internal" or "qingdao-internal". -```javascript - var AliMNS = require("ali-mns"); - var account = new AliMNS.Account("", "", ""); - var mns = new AliMNS.MNS(account, "hangzhou"); -``` - -## MQS(account:Account, region?:string) -和MNS相同.为了向下兼容v1.x版本. - -## mns.listP(prefix?:string, pageSize?:number, pageMarker?:string) -列出一个数据中心里的所有队列. - -prefix: String, optional. 只返回带有此前缀的队列. - -pageSize: number, optional. 每页返回多少队列,1~1000,缺省1000. - -pageMarker: String, optional. 填入上一次请求中返回的值,来请求下一页. -```javascript - mns.listP("my", 20).then(function(data){ - console.log(data); - return mns.listP("my", 20, data.Queues.NextMarker); - }).then(function(dataP2){ - console.log(dataP2); - }, console.error); -``` - -## mns.createP(name:string, options?:any) -创建一个队列. - -name: String. 队列名称. - -options: optional. 队列属性. - -options.DelaySeconds: number. 消息被发送后经过多少秒才可见.0~604800(7天),缺省0. - -options.MaximumMessageSize: number. 消息最大可以是多少字节.1024(1k)~65536, 缺省是65536(64k). - -options.MessageRetentionPeriod: number. 消息最久可以生存多少秒, 60~129600(15天),缺省是345600(4天). - -optiions.VisibilityTimeout: number. 消息被接收后,保持多少秒不可见,1~43200(12小时). - -options.PollingWaitSeconds: numer. 当消息队列为空时,接收请求最多等待多少秒,0~30,缺省是0. -```javascript - mns.createP("myAliMQ", { - DelaySeconds: 0, - MaximumMessageSize: 65536, - MessageRetentionPeriod: 345600, - VisibilityTimeout: 30, - PollingWaitSeconds: 0 - }).then(console.log, console.error); -``` -如果一个同名队列已经存在,且调用createP方法的所有属性都和现有队列相同,那么调用会返回成功. -任何一个属性不同,会报告"QueueAlreadyExist"(队列已经存在)错误. - -## mns.deleteP(name:string) -删除一个队列. - -name: String. 队列名称. -```javascript - mns.deleteP("myAliMQ").then(console.log, console.error);; -``` - -## MQ(name:string, account:Account, region?:string) -*MQ*操作队列中的消息. - -name: String. 队列名称. - -account: 帐号对象. - -region: String, optional. 可能的取值为"hangzhou", "beijing" or "qingdao",分别代表阿里云提供消息服务的3个数据中心. -缺省为"hangzhou".也可以是带有"-internal"后缀的内网形式,如"hangzhou-internal", "beijing-internal" or "qingdao-internal". -```javascript - var AliMNS = require("ali-mns"); - var account = new AliMNS.Account("", "", ""); - var mq = new AliMNS.MQ("myAliMQ", account, "hangzhou"); -``` - -## mq.getName() -获取队列名称. - -## mq.getAccount() -获取队列帐号. - -## gmq.etRegion() -获取队列位置. - -## mq.sendP(msg:string, priority?:number, delaySeconds?:number) -向队列中发送一个消息. - -message: String. 消息内容. - -priority: number, optional. 优先级1(最低)~16(最高),缺省是8. - -delaySeconds: number, optional. 消息发送多少秒后才可见,0~604800(7天),缺省是0.此参数优先于队列的options.DelaySeconds属性. -```javascript - mq.sendP("Hello Ali-MNS", 8, 0).then(console.log, console.error); -``` - -## mq.getRecvTolerance() & mq.setRecvTolerance(value:number) -获取或设置mq.recvP方法的容忍秒数. - -value: number. Default is 5, in seconds. 缺省是5秒.多久mq.recvP方法会触发超时. -由于网络延迟,mq.recvP方法返回可能会更晚. - -## mq.recvP(waitSeconds?:number) -从队列中接收消息. -它会短暂改变消息的可见性. - -waitSeconds: number. optional. -在返回*MessageNotExist*(消息不存在)错误之前,最大的等待秒数. -```javascript - mq.recvP(5).then(console.log, console.error); -``` -如果队列为空,那么此方法一共会等待`waitSeconds + getRecvTolerance()`秒. - -## mq.peekP() -查探消息. -它不会改变消息的可见性. -```javascript - mq.peekP(5).then(console.log, console.error); -``` - -## mq.deleteP(receiptHandle:string) -删除消息. -消息被接收后,有一个短暂的不可见期.消息在被处理完成后,必须被删除,否则,当不可见期过后,它又能再次被接收. - -receiptHandle: String. 由mq.recvP或mq.notifyRecv返回. -```javascript - mq.recvP(5).then(function(data){ - return mq.deleteP(data.Message.ReceiptHandle); - }).then(console.log, console.error); -``` - -## mq.reserveP(receiptHandle:string, reserveSeconds:number) -保留一个消息. - -receiptHandle: String. 由mq.recvP或mq.notifyRecv返回. - -reserveSeconds: number. 消息保留多少秒,1~43200(12小时). -```javascript - mq.recvP().then(function(data){ - return mq.reserveP(data.Message.ReceiptHandle, 120); - }).then(function(dataReserved){ - return mq.deleteP(dataReserved.ChangeVisibility.ReceiptHandle); - }); -``` -如果处理消息需要花费更多的时间,可以使用这个方法把消息保留更久一些. -从调用此方法开始,消息将继续保持不可见reserveSeconds秒. -也可以设置一个比它的原本不可见期更短的时间. -如果成功,返回一个新的receiptHandle用来取代旧的那个,后续对mq.deleteP或mq.reserveP的调用必须使用这个新的值. -并且,这个新的receiptHandle将会在reserveSeconds秒后过作废. - -## mq.notifyRecv(cb:(ex:Error, msg:any)=>Boolean, waitSeconds?:number) -注册一个回调函数来接收消息. - -cb: 每收到一条消息,注册的回调函数就会被调用一次. -如果回调函数返回*true*,收到的消息会被自动删除. -如果你想自己执行删除操作,那么让回调函数返回*false*. - -waitSeconds: number, optional. 可能的值为1~30.每次轮循最大等待秒数,缺省是5. -在每次轮循的开始,都会检查mq.notifyStopP是否已被调用,所以更大的数值会导致更慢的mq.notifyStopP -设置为0时,会和设置为缺省值5秒的效果相同. -```javascript - mq.notifyRecv(function(err, message){ - console.log(message); - if(err && err.message === "NetworkBroken"){ - // Best to restart the process when this occurs - throw err; - } - return true; // this will cause message to be deleted automatically - }); -``` -如果对同一个队列设置2个不同的回调函数,那么2个回调函数都会起作用. -但一个消息的到来,只会触发它们之中的一个. - -## mq.notifyStopP() -停止mq.notifyRecv.接收循环实际停止时,返回的promise对象才会被解析. -notifyRecv()的最大等待时长由传递给mq.notifyRecv的waitSeconds参数决定. -```javascript - mq.notifyStopP().then(console.log, console.error); -``` - -## mq.getAttrsP() -获取队列的属性. -```javascript -mq.getAttrsP().then(console.log, console.error); -``` - -## mq.setAttrsP(options:any) -修改队列的属性. - -options: the queue attributes. See the [options](#options) of mns.createP. 队列属性,查看mns.createP的[options](#options)参数. -```javascript - mq.setAttrsP({ - DelaySeconds: 0, - MaximumMessageSize: 65536, - MessageRetentionPeriod: 345600, - VisibilityTimeout: 30, - PollingWaitSeconds: 0 - }).then(console.log, console.error); -``` - -# Msg(msg: string, priority?:number, delaySeconds?:number) -简单消息定义,用于MQBatch类. - -msg: string. 消息内容. - -priority: number, optional. 优先级.1(最低)~16(最高). - -delaySeconds: number, optional. 消息发送多少秒后才可见,0~604800(7天),缺省是0.此参数优先于队列的options.DelaySeconds属性. -```javascript -var msg = new AliMNS.Msg("Make a test"); -``` - -## msg.getMsg() -返回消息内容. - - -## msg.getPriority() -返回消息优先级. - -## msg.getDelaySeconds() -返回消息延迟可见秒数. - -# MQBatch -2015年6月,阿里云引入了新的批量消息队列模式.此类派生自MQ类,因此,所有MQ类的方法对MQBatch都适用.例如,你 -可以使用`mqBatch.setRecvTolerance(1.2)`来调节*mqBatch.recvP()*的超时行为. -```javascript -var mqBatch = new AliMNS.MQBatch(aliCfg.mqName, account, aliCfg.region); -``` - -## mqBatch.sendP(msg:string | Array, priority?:number, delaySeconds?:number) -发送一条消息或一批消息. - -msg: String 或 an array of Msg. 一批最大可以发送16个消息. - -priority: number, optional. 优先级,仅在`msg`参数是字符串时有效,1(最低)~16(最高).缺省是8. - -delaySeconds: number, optional. 仅在`msg`参数是字符串时有效,消息发送多少秒后才可见,0~604800(7天),缺省是0.此参数优先于队列的options.DelaySeconds属性. - -如果`msg`是`Msg`的数组,使用`Msg`对象的priority和delaySeconds属性,并忽略第2和第3个参数. -```javascript - var msgs = []; - for(var i=0;i<5;i++){ - var msg = new AliMNS.Msg("BatchSend" + i, 8, 0); - msgs.push(msg); - } - - mqBatch.sendP(msgs); -``` - -## mqBatch.recvP(waitSeconds?:number, numOfMessages?:number) -接收一条或者一批消息. -它会改变消息的可见性. - -waitSeconds: number. optional. 在返回*MessageNotExist*(消息不存在)错误之前,最大的等待秒数. - -numOfMessages: number. optional. 最多一批接收的消息数目,1~16,缺省是16. -```javascript - mqBatch.recvP(5, 16).then(console.log, console.error); -``` - -## mqBatch.peekP(numOfMessages?:number) -查探一条或者一批消息. -它不会改变消息的可见性. - -numOfMessages: number. optional. 最多一批查探的消息数目,1~16,缺省是16. -```javascript - mqBatch.peekP(5, 16).then(console.log, console.error); -``` - -## mqBatch.deleteP(receiptHandle:string | Array) -删除一条或一批消息. -消息被接收后,有一个短暂的不可见期.消息在被处理完成后,必须被删除,否则,当不可见期过后,它又能再次被接收. - -receiptHandle: String 或 an array of string. 由mq.recvP或mq.notifyRecv返回. - -```javascript - var rhsToDel = []; - mqBatch.recvP(5, 16).then(function(dataRecv){ - for(var i=0;iBoolean, waitSeconds?:number, numOfMessages?:number) -注册一个回调函数来接收消息,支持批量模式. - -numOfMessages: number. optional. 最多一批接收的消息数目,1~16,缺省是16. - -所有春它参数都和*mq.notifyRecv*一致. - -# DEBUG Trace -设置环境变量"ali-mns"为**DEBUG**可以开启调试输出. -```SHELL -# linux bash -export DEBUG=ali-mns - -# windows -set DEBUG=ali-mns -``` - -# Migrate -+ 1.The ali-mns is fully compatible with ali-mqs, simply replace the ali-mqs package to ali-mns. -```javascript -// var AliMQS = require('ali-mqs'); -var AliMQS = require('ali-mns'); -``` - -+ 2.Optional. Change the **ownerId** to **accountId** -Ali-Yun upgrade their account system, and recommend to use the newer account id instead of owner id. -But the old owner id is still available for now. -```javascript -var AliMQS = require("ali-mns"); -// var account = new AliMNS.Account("hl35yqoedp", "", ""); -var account = new AliMNS.Account("1786090012649663", "", ""); -``` -**ownerId** is mixed with number and letter - -**accountId** is a 16-digits number, -follow [this link](https://account.console.aliyun.com/#/secure) to find your accountId. - -In GitHub, [An branch v1.x](https://github.com/InCar/ali-mns/tree/v1.x) keeps tracking for the old mqs services. -And use `npm install ali-mqs' to install the [ali-mqs](https://www.npmjs.com/package/ali-mqs) package for v1.x. - -# Performance - Serial vs. Batch 串行和批量的性能对比 -Create 20 queues, then send 2000 messages to them randomly. - -创建20个队列,然后随机的向它们发送共计2000条消息. - -It is about **10 times slower** in serial mode than in batch mode. - -串行模式大约比批量模式慢10倍. - -**1st - Serial Mode(batch_size=1)** -``` -// 20 queues 2000 messages batch_size=1 串行模式 - AliMNS-performance - concurrent-queues - √ #BatchSend (3547ms) - √ #recvP (21605ms) - √ #stopRecv (6075ms) -``` - -**2nd - Batch Mode(Batch_size=16)** -``` -// 20 queues 2000 messages batch_size=16 批量模式(1批16条消息) - AliMNS-performance - concurrent-queues - √ #BatchSend (3472ms) - √ #recvP (2125ms) - √ #stopRecv (6044ms) -``` - -The testing code is in [$/test/performance.js](https://github.com/InCar/ali-mns/blob/master/test/performance.js) -and a test log sample is in [$/test/performance.log](https://github.com/InCar/ali-mns/blob/master/test/performance.log) - -Needs [mocha](https://www.npmjs.com/package/mocha) module to run the test. - -Set environment variable **DEBUG** to **ali-mns.test** to turn on output trace(will slow down the test). - -# License -MIT +# ali-mns (ali-mqs) +[![npm version](https://badge.fury.io/js/ali-mns.svg)](http://badge.fury.io/js/ali-mns) +[![npm version](https://badge.fury.io/js/ali-mqs.svg)](http://badge.fury.io/js/ali-mqs) + +阿里云消息服务(MNS)nodejs软开发包 + +[AliYun-MNS.en-US.README](https://github.com/InCar/ali-mns/blob/master/README.md) + +阿里云消息服务是由阿里云提供的一种消息队列服务中间件. +淘宝网www.taobao.com本身也使用了这种技术. + +访问阿里云消息服务的官方网站 [http://www.aliyun.com/product/mns](http://www.aliyun.com/product/mns) 以了解更多有关阿里云消息服务的详情. + +2015年6月,阿里云使用了新名称Ali-MSN替代了旧的Ali-MQS. +了解如何从旧的版本升级,请访问 [Migrate](#migrate). + +# 快速开始 +使用'npm install ali-mns'来进行安装. + +```javascript + var AliMNS = require("ali-mns"); + var account = new AliMNS.Account("", "", ""); + var mq = new AliMNS.MQ("", account, "hangzhou"); + // send message + mq.sendP("Hello ali-mns").then(console.log, console.error); +``` +更多示例代码可以参考[GitHub](https://github.com/InCar/ali-mns/tree/master/test). + +# Promised +ali-mns使用 [promise](https://www.npmjs.org/package/promise) 模式. +所有后缀'P'的方法都会返回一个promise对象. + +# Typescript +如果仅仅只是打算使用它,放心忽略本小节内容. + +绝大多数源代码都用typescript写成,访问 [http://www.typescriptlang.org/](http://www.typescriptlang.org/) 获取更多typescript的知识. + +如果你对源代码感兴趣,访问GitHub [https://github.com/InCar/ali-mns](https://github.com/InCar/ali-mns) + +克隆源代后,使用`gulp`来编译.ts文件. + +# API参考 +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
类型方法简述
[Account](#accountaccountidstring-keyidstring-keysecretstring)*Account*类用于存储你的阿里云帐号信息.
[getAccountId](#accountgetaccountid)返回阿里云帐号id.
[getOwnerId](#accountgetownerid)和account.getAccountId()功能相同. 为了向下兼容v1.x版本.
[getKeyId](#accountgetkeyid)返回阿里钥id.
[MNS](#mnsaccountaccount-regionstring)
[MQS](#mqsaccountaccount-regionstring)
[MNSTopic](#mnstopicaccountaccount-regionstring)
*MNS*类用于操作mns队列. *MQS*和*MNS*相同.为了向下兼容v1.x版本.
[listP](#mnslistpprefixstring-pagesizenumber-pagemarkerstring)列出一个数据中心里的所有队列.
[createP](#mnscreatepnamestring-optionsany)创建一个队列.
[deleteP](#mnsdeletepnamestring)删除一个队列.
[MQ](#mqnamestring-accountaccount-regionstring)
[MQBatch](#mqbatch)
*MQ*操作队列中的消息.
[getName](#mqgetname)获取队列名称.
[getAccount](#mqgetaccount)获取队列帐号.
[getRegion](#mqgetregion)获取队列位置.
[sendP](#mqsendpmsgstring-prioritynumber-delaysecondsnumber)向队列中发送一个消息.
[getRecvTolerance](#mqgetrecvtolerance--mqsetrecvtolerancevaluenumber)获取mq.recvP方法的容忍秒数.
[setRecvTolerance](#mqgetrecvtolerance--mqsetrecvtolerancevaluenumber)设置mq.recvP方法的容忍秒数.
[recvP](#mqrecvpwaitsecondsnumber)从队列中接收消息.
[peekP](#mqpeekp)查探消息.
[deleteP](#mqdeletepreceipthandlestring)删除消息.
[reserveP](#mqreservepreceipthandlestring-reservesecondsnumber)保留一个消息.
[notifyRecv](#mqnotifyrecvcbexerror-msganyboolean-waitsecondsnumber)注册一个回调函数来接收消息.
[notifyStopP](#mqnotifystopp)停止mq.notifyRecv.
[getAttrsP](#mqgetattrsp)获取队列的属性.
[setAttrsP](#mqsetattrspoptionsany)修改队列的属性.
[MQBatch](#mqbatch)批量消息队列
[sendP](#mqbatchsendpmsgstring--array-prioritynumber-delaysecondsnumber)发送一条消息或一批消息.
[recvP](#mqbatchrecvpwaitsecondsnumber-numofmessagesnumber)接收一条或者一批消息.
[peekP](#mqbatchpeekpnumofmessagesnumber)查探一条或者一批消息.
[deleteP](#mqbatchdeletepreceipthandlestring--array)删除一条或一批消息.
[notifyRecv](#mqbatchnotifyrecvcbexerror-msganyboolean-waitsecondsnumber-numofmessagesnumber)注册一个回调函数来接收消息,支持批量模式.
[Msg](#msgmsg-string-prioritynumber-delaysecondsnumber)简单消息定义,用于MQBatch类.
[getMsg](#msggetmsg)返回消息内容.
[getPriority](#msggetpriority)返回消息优先级.
[getDelaySeconds](#msggetdelayseconds)返回消息延迟可见秒数.
[MNSTopic](#mnstopicaccountaccount-regionstring)MNSTopic扩展自MNS,它提供了基于主题模型的消息功能.
[listTopicP](#mnslisttopicpprefixstring-pagesizenumber-pagemarkerstring)列出所有的主题.
[createTopicP](#mnscreatetopicpnamestring-optionsany)创建一个主题.
[deleteTopicP](#mnsdeletetopicpnamestring)删除一个主题.
[Topic](#topicnamestring-accountaccount-regionstring)操控主题
[getName](#topicgetname)获取主题名称.
[getAccount](#topicgetaccount)获取主题帐号.
[getRegion](#topicgetregion)获取主题位置.
[getAttrsP](#topicgetattrsp--topicsetattrspoptionsany)获取主题属性.
[setAttrsP](#topicgetattrsp--topicsetattrspoptionsany)设置主题属性.
[listP](#topiclistpprefixstring-pagesizenumber-pagemarkerstring)列出主题的所有订阅.
[subscribeP](#topicsubscribepnamestring-endpointstring-notifystrategystring-notifycontentformatstring)订阅一个主题.
[unsubscribeP](#topicunsubscribepnamestring)取消对一个主题的订阅.
[publishP](#topicpublishpmsgstring-b64boolean)向主题中发布一个消息.
[Subscription](#subscriptionnamestring-topictopic)操控一个订阅.
[getName](#subscriptiongetname)获取订阅的名称.
[getTopic](#subscriptiongettopic)获取订阅相关的主題.
[getAttrsP](#subscriptiongetattrsp--subscriptionsetattrspoptionsany)获取订阅的属性.
[setAttrsP](#subscriptiongetattrsp--subscriptionsetattrspoptionsany)设置订阅的属性.
[NotifyStrategy](#subscriptionnotifystrategy)通知策略.
[NotifyContentFormat](#subscriptionnotifycontentformat)通知内容格式.
+ + +## Account(accountId:string, keyId:string, keySecret:string) +*Account*类用于存储你的阿里云帐号信息.创建一个帐号对象很简单: + +accountId: String, 阿里云帐号id. + +keyId: String, 阿里云钥id. + +keySecret: String, 阿里云密钥. +```javascript + var AliMNS = require("ali-mns"); + var account = new AliMNS.Account("", "", ""); +``` +帐号对象通常作为参数被传递给其它类的对象如*MNS*, *MQ* + +猛戳[这里](https://ak-console.aliyun.com/#/accesskey)找到你的阿里云帐号. + +## account.getAccountId() +返回阿里云帐号id. + +## account.getOwnerId() +和account.getAccountId()功能相同. 为了向下兼容v1.x版本. + +## account.getKeyId() +返回阿里钥id. + +## MNS(account:Account, region?:string) +*MNS*类用于操作mns队列. + +account: 阿里云帐号对象. + +region: String, optional. 可能的取值为"hangzhou", "beijing" or "qingdao",分别代表阿里云提供消息服务的3个数据中心. +缺省为"hangzhou".也可以是带有"-internal"后缀的内网形式,如"hangzhou-internal", "beijing-internal" or "qingdao-internal". +```javascript + var AliMNS = require("ali-mns"); + var account = new AliMNS.Account("", "", ""); + var mns = new AliMNS.MNS(account, "hangzhou"); +``` + +## MQS(account:Account, region?:string) +和MNS相同.为了向下兼容v1.x版本. + +## mns.listP(prefix?:string, pageSize?:number, pageMarker?:string) +列出一个数据中心里的所有队列. + +prefix: String, optional. 只返回带有此前缀的队列. + +pageSize: number, optional. 每页返回多少队列,1~1000,缺省1000. + +pageMarker: String, optional. 填入上一次请求中返回的值,来请求下一页. +```javascript + mns.listP("my", 20).then(function(data){ + console.log(data); + return mns.listP("my", 20, data.Queues.NextMarker); + }).then(function(dataP2){ + console.log(dataP2); + }, console.error); +``` + +## mns.createP(name:string, options?:any) +创建一个队列. + +name: String. 队列名称. + +options: optional. 队列属性. + +options.DelaySeconds: number. 消息被发送后经过多少秒才可见.0~604800(7天),缺省0. + +options.MaximumMessageSize: number. 消息最大可以是多少字节.1024(1k)~65536, 缺省是65536(64k). + +options.MessageRetentionPeriod: number. 消息最久可以生存多少秒, 60~129600(15天),缺省是345600(4天). + +optiions.VisibilityTimeout: number. 消息被接收后,保持多少秒不可见,1~43200(12小时). + +options.PollingWaitSeconds: numer. 当消息队列为空时,接收请求最多等待多少秒,0~30,缺省是0. +```javascript + mns.createP("myAliMQ", { + DelaySeconds: 0, + MaximumMessageSize: 65536, + MessageRetentionPeriod: 345600, + VisibilityTimeout: 30, + PollingWaitSeconds: 0 + }).then(console.log, console.error); +``` +如果一个同名队列已经存在,且调用createP方法的所有属性都和现有队列相同,那么调用会返回成功. +任何一个属性不同,会报告"QueueAlreadyExist"(队列已经存在)错误. + +## mns.deleteP(name:string) +删除一个队列. + +name: String. 队列名称. +```javascript + mns.deleteP("myAliMQ").then(console.log, console.error);; +``` + +## MQ(name:string, account:Account, region?:string) +*MQ*操作队列中的消息. + +name: String. 队列名称. + +account: 帐号对象. + +region: String, optional. 可能的取值为"hangzhou", "beijing" or "qingdao",分别代表阿里云提供消息服务的3个数据中心. +缺省为"hangzhou".也可以是带有"-internal"后缀的内网形式,如"hangzhou-internal", "beijing-internal" or "qingdao-internal". +```javascript + var AliMNS = require("ali-mns"); + var account = new AliMNS.Account("", "", ""); + var mq = new AliMNS.MQ("myAliMQ", account, "hangzhou"); +``` + +## mq.getName() +获取队列名称. + +## mq.getAccount() +获取队列帐号. + +## mq.getRegion() +获取队列位置. + +## mq.sendP(msg:string, priority?:number, delaySeconds?:number) +向队列中发送一个消息. + +message: String. 消息内容. + +priority: number, optional. 优先级1(最低)~16(最高),缺省是8. + +delaySeconds: number, optional. 消息发送多少秒后才可见,0~604800(7天),缺省是0.此参数优先于队列的options.DelaySeconds属性. +```javascript + mq.sendP("Hello Ali-MNS", 8, 0).then(console.log, console.error); +``` + +## mq.getRecvTolerance() & mq.setRecvTolerance(value:number) +获取或设置mq.recvP方法的容忍秒数. + +value: number. Default is 5, in seconds. 缺省是5秒.多久mq.recvP方法会触发超时. +由于网络延迟,mq.recvP方法返回可能会更晚. + +## mq.recvP(waitSeconds?:number) +从队列中接收消息. +它会短暂改变消息的可见性. + +waitSeconds: number. optional. +在返回*MessageNotExist*(消息不存在)错误之前,最大的等待秒数. +```javascript + mq.recvP(5).then(console.log, console.error); +``` +如果队列为空,那么此方法一共会等待`waitSeconds + getRecvTolerance()`秒. + +## mq.peekP() +查探消息. +它不会改变消息的可见性. +```javascript + mq.peekP(5).then(console.log, console.error); +``` + +## mq.deleteP(receiptHandle:string) +删除消息. +消息被接收后,有一个短暂的不可见期.消息在被处理完成后,必须被删除,否则,当不可见期过后,它又能再次被接收. + +receiptHandle: String. 由mq.recvP或mq.notifyRecv返回. +```javascript + mq.recvP(5).then(function(data){ + return mq.deleteP(data.Message.ReceiptHandle); + }).then(console.log, console.error); +``` + +## mq.reserveP(receiptHandle:string, reserveSeconds:number) +保留一个消息. + +receiptHandle: String. 由mq.recvP或mq.notifyRecv返回. + +reserveSeconds: number. 消息保留多少秒,1~43200(12小时). +```javascript + mq.recvP().then(function(data){ + return mq.reserveP(data.Message.ReceiptHandle, 120); + }).then(function(dataReserved){ + return mq.deleteP(dataReserved.ChangeVisibility.ReceiptHandle); + }); +``` +如果处理消息需要花费更多的时间,可以使用这个方法把消息保留更久一些. +从调用此方法开始,消息将继续保持不可见reserveSeconds秒. +也可以设置一个比它的原本不可见期更短的时间. +如果成功,返回一个新的receiptHandle用来取代旧的那个,后续对mq.deleteP或mq.reserveP的调用必须使用这个新的值. +并且,这个新的receiptHandle将会在reserveSeconds秒后过作废. + +## mq.notifyRecv(cb:(ex:Error, msg:any)=>Boolean, waitSeconds?:number) +注册一个回调函数来接收消息. + +cb: 每收到一条消息,注册的回调函数就会被调用一次. +如果回调函数返回*true*,收到的消息会被自动删除. +如果你想自己执行删除操作,那么让回调函数返回*false*. + +waitSeconds: number, optional. 可能的值为1~30.每次轮循最大等待秒数,缺省是5. +在每次轮循的开始,都会检查mq.notifyStopP是否已被调用,所以更大的数值会导致更慢的mq.notifyStopP +设置为0时,会和设置为缺省值5秒的效果相同. +```javascript + mq.notifyRecv(function(err, message){ + console.log(message); + if(err && err.message === "NetworkBroken"){ + // Best to restart the process when this occurs + throw err; + } + return true; // this will cause message to be deleted automatically + }); +``` +如果对同一个队列设置2个不同的回调函数,那么2个回调函数都会起作用. +但一个消息的到来,只会触发它们之中的一个. + +## mq.notifyStopP() +停止mq.notifyRecv.接收循环实际停止时,返回的promise对象才会被解析. +notifyRecv()的最大等待时长由传递给mq.notifyRecv的waitSeconds参数决定. +```javascript + mq.notifyStopP().then(console.log, console.error); +``` + +## mq.getAttrsP() +获取队列的属性. +```javascript +mq.getAttrsP().then(console.log, console.error); +``` + +## mq.setAttrsP(options:any) +修改队列的属性. + +options: 队列属性,查看mns.createP的[options](#options)参数. +```javascript + mq.setAttrsP({ + DelaySeconds: 0, + MaximumMessageSize: 65536, + MessageRetentionPeriod: 345600, + VisibilityTimeout: 30, + PollingWaitSeconds: 0 + }).then(console.log, console.error); +``` + +# Msg(msg: string, priority?:number, delaySeconds?:number) +简单消息定义,用于MQBatch类. + +msg: string. 消息内容. + +priority: number, optional. 优先级.1(最低)~16(最高). + +delaySeconds: number, optional. 消息发送多少秒后才可见,0~604800(7天),缺省是0.此参数优先于队列的options.DelaySeconds属性. +```javascript +var msg = new AliMNS.Msg("Make a test"); +``` + +## msg.getMsg() +返回消息内容. + + +## msg.getPriority() +返回消息优先级. + +## msg.getDelaySeconds() +返回消息延迟可见秒数. + +# MQBatch +2015年6月,阿里云引入了新的批量消息队列模式.此类派生自MQ类,因此,所有MQ类的方法对MQBatch都适用.例如,你 +可以使用`mqBatch.setRecvTolerance(1.2)`来调节*mqBatch.recvP()*的超时行为. +```javascript +var mqBatch = new AliMNS.MQBatch(aliCfg.mqName, account, aliCfg.region); +``` + +## mqBatch.sendP(msg:string | Array, priority?:number, delaySeconds?:number) +发送一条消息或一批消息. + +msg: String 或 an array of Msg. 一批最大可以发送16个消息. + +priority: number, optional. 优先级,仅在`msg`参数是字符串时有效,1(最低)~16(最高).缺省是8. + +delaySeconds: number, optional. 仅在`msg`参数是字符串时有效,消息发送多少秒后才可见,0~604800(7天),缺省是0.此参数优先于队列的options.DelaySeconds属性. + +如果`msg`是`Msg`的数组,使用`Msg`对象的priority和delaySeconds属性,并忽略第2和第3个参数. +```javascript + var msgs = []; + for(var i=0;i<5;i++){ + var msg = new AliMNS.Msg("BatchSend" + i, 8, 0); + msgs.push(msg); + } + + mqBatch.sendP(msgs); +``` + +## mqBatch.recvP(waitSeconds?:number, numOfMessages?:number) +接收一条或者一批消息. +它会改变消息的可见性. + +waitSeconds: number. optional. 在返回*MessageNotExist*(消息不存在)错误之前,最大的等待秒数. + +numOfMessages: number. optional. 最多一批接收的消息数目,1~16,缺省是16. +```javascript + mqBatch.recvP(5, 16).then(console.log, console.error); +``` + +## mqBatch.peekP(numOfMessages?:number) +查探一条或者一批消息. +它不会改变消息的可见性. + +numOfMessages: number. optional. 最多一批查探的消息数目,1~16,缺省是16. +```javascript + mqBatch.peekP(5, 16).then(console.log, console.error); +``` + +## mqBatch.deleteP(receiptHandle:string | Array) +删除一条或一批消息. +消息被接收后,有一个短暂的不可见期.消息在被处理完成后,必须被删除,否则,当不可见期过后,它又能再次被接收. + +receiptHandle: String 或 an array of string. 由mq.recvP或mq.notifyRecv返回. + +```javascript + var rhsToDel = []; + mqBatch.recvP(5, 16).then(function(dataRecv){ + for(var i=0;iBoolean, waitSeconds?:number, numOfMessages?:number) +注册一个回调函数来接收消息,支持批量模式. + +numOfMessages: number. optional. 最多一批接收的消息数目,1~16,缺省是16. + +所有春它参数都和*mq.notifyRecv*一致. + +# MNSTopic(account:Account, region?:string) +`MNSTopic`提供了关于主题模型的功能,它扩展自`MNS`. +所有`MNS`的方法都适用于`MNSTopic`. +```javascript + var AliMNS = require("ali-mns"); + var account = new AliMNS.Account("", "", ""); + var mns = new AliMNS.MNSTopic(account, "shenzhen"); +``` +*截至目前(2016年4月),主题模型仅在深圳数据中心提供服务* + +## mns.listTopicP(prefix?:string, pageSize?:number, pageMarker?:string) +列出所有的主题. + +prefix: 可选.只返回特定前缀的主题. + +pageSize: 可选.每页包含的主题数目1~1000,缺省为1000. + +pageMarker: 可选.填入上一次请求中返回的值,来请求下一页. + +## mns.createTopicP(name:string, options?:any) +创建一个主题. + +name: 主题名称. + +options: 选项. + +options.MaximumMessageSize: int. 消息的最大尺寸, 1024(1k)~65536(64k), 缺省为65536. + +options.LoggingEnabled: boolean. 是否开启日志记录,缺省是false不开启. + +## mns.deleteTopicP(name:string) +删除一个主题 + +name: 主题名称. + +# Topic(name:string, account:Account, region?:string) +操控一个主题。 + +name: 主题名称. + +account: 主题帐号. + +region: 可选.数据中心,可以是"shenzhen"或"shenzhen-internal", 缺省是"hangzhou". + +*截至目前(2016年4月), 主题模型仅在深圳数据中心提供服务* +```javascript +var AliMNS = require("ali-mns"); +var account = new AliMNS.Account("", "", ""); +var topic = new AliMNS.Topic("t11", account, "shenzhen"); +``` + +## topic.getName() +获取主题名称. + +## topic.getAccount() +获取主题帐号. + +## topic.getRegion() +获取主题数据中心位置. + +## topic.getAttrsP() & topic.setAttrsP(options:any) +获取或设置主题的属性. + +options: 主题属性. + +options.MaximumMessageSize: int. 消息的最大尺寸, 1024(1k)~65536(64k), 缺省为65536. + +options.LoggingEnabled: boolean. 是否开启日志记录,缺省是false不开启. + +```javascript +topic.setAttrsP({ MaximumMessageSize: 1024 }); +topic.getAttrsP().then((data)=>{ console.info(data); }); +``` + +## topic.listP(prefix?:string, pageSize?:number, pageMarker?:string) +列出所有的订阅. + +prefix: 可选.只返回特定前缀的订阅. + +pageSize: 可选.每页包含的订阅数目1~1000,缺省为1000. + +pageMarker: 可选.填入上一次请求中返回的值,来请求下一页. + +## topic.subscribeP(name:string, endPoint:string, notifyStrategy?:string, notifyContentFormat?:string) +订阅一个主题. + +name: 订阅名称. + +endPoint: 通知终端点. 例如: `http://www.yoursite.com/mns-ep` + +notifyStrategy: 可选.通知策略BACKOFF_RETRY或EXPONENTIAL_DECAY_RETRY,缺省是BACKOFF_RETRY. + +notifyContentFormat: 可选.通知消息格式XML或SIMPLIFIED,缺省是XML. + +```javascript +topic.subscribeP("subx", "http://www.yoursite.com/mns-ep", + AliMNS.Subscription.NotifyStrategy.BACKOFF_RETRY, + AliMNS.Subscription.NotifyContentFormat.SIMPLIFIED) + .then( + (data)=>{ console.info(data);}, + (err)=>{ console.error(err); } + ); +``` + +## topic.unsubscribeP(name:string) +取消订阅. + +name: 订阅名称. + +## topic.publishP(msg:string, b64:boolean) +向主题中发布一个消息. + +msg: 消息内容. + +b64: true, 发布消息使用base64编码方式. +false, 发布消息不使用base64编码方式. + +如果消息中包含中文字符,必须把`b64`设置为`true`. +只有非常简单的消息才可以把`b64`设置为`false`. + +# Subscription(name:string, topic:Topic) +操控一个订阅. +```javascript +var AliMNS = require("ali-mns"); +var account = new AliMNS.Account("", "", ""); +var topic = new AliMNS.Topic("t11", account, "shenzhen"); +var subscription = new AliMNS.Subscription("s12", topic); +``` + +## subscription.getName() +获取订阅名称. + +## subscription.getTopic() +获取订阅相关的主题. + + +## subscription.getAttrsP() & subscription.setAttrsP(options:any) +获取或设置订阅属性. + +options: 订阅属性. + +options.NotifyStrategy: 订阅策略 BACKOFF_RETRY或EXPONENTIAL_DECAY_RETRY. +```javascript +subscription.setAttrsP({ NotifyStrategy: AliMNS.Subscription.NotifyStrategy.EXPONENTIAL_DECAY_RETRY }); +``` + +## Subscription.NotifyStrategy +订阅策略,包含2个字符串定义. + +AliMNS.Subscription.NotifyStrategy.BACKOFF_RETRY : "BACKOFF_RETRY" + +AliMNS.Subscription.NotifyStrategy.EXPONENTIAL_DECAY_RETRY : "EXPONENTIAL_DECAY_RETRY" + +[关于订阅策略](https://help.aliyun.com/document_detail/mns/api_reference/concepts/NotifyStrategy.html?spm=5176.docmns/api_reference/topic_api_spec/subscription_operation.6.141.tmwb5L) + +## Subscription.NotifyContentFormat +通知消息格式,包含2个字符串定义. + +AliMNS.Subscription.NotifyContentFormat.XML : "XML" + +AliMNS.Subscription.NotifyContentFormat.SIMPLIFIED : "SIMPLIFIED" + +[关于通知消息格式](https://help.aliyun.com/document_detail/mns/api_reference/concepts/NotifyContentFormat.html?spm=5176.docmns/api_reference/concepts/NotifyStrategy.6.142.kWiFyy) + +# 调式输出 +设置环境变量"ali-mns"为**DEBUG**可以开启调试输出. +```SHELL +# linux bash +export DEBUG=ali-mns + +# windows +set DEBUG=ali-mns +``` + +# Migrate +从1.x版本迁移 ++ 1.ali-mns完全兼容ali-mqs, 简单替换ali-mqs包成ali-mns. +```javascript +// var AliMQS = require('ali-mqs'); +var AliMQS = require('ali-mns'); +``` + ++ 2.可选. 更改**ownerId**为**accountId** +阿里云升级了帐号系统,推荐使用新的account id 取代 owner id. +但旧的owner id目前仍然有效. +```javascript +var AliMQS = require("ali-mns"); +// var account = new AliMNS.Account("hl35yqoedp", "", ""); +var account = new AliMNS.Account("1786090012649663", "", ""); +``` +**ownerId** 混合了字母和数字. + +**accountId** 16位的数字. +点击[这个链接](https://account.console.aliyun.com/#/secure)找到你的account id. + +在GitHub, [分支v1.x](https://github.com/InCar/ali-mns/tree/v1.x) 跟踪旧的mqs服务. +使用`npm install ali-mqs' 安装 [ali-mqs](https://www.npmjs.com/package/ali-mqs) 包的v1.x版本. + +# Performance - Serial vs. Batch 串行和批量的性能对比 +创建20个队列,然后随机的向它们发送共计2000条消息. + +串行模式大约比批量模式慢10倍. + +**1st - Serial Mode(batch_size=1)** +``` +// 20 queues 2000 messages batch_size=1 串行模式 + AliMNS-performance + concurrent-queues + √ #BatchSend (3547ms) + √ #recvP (21605ms) + √ #stopRecv (6075ms) +``` + +**2nd - Batch Mode(Batch_size=16)** +``` +// 20 queues 2000 messages batch_size=16 批量模式(1批16条消息) + AliMNS-performance + concurrent-queues + √ #BatchSend (3472ms) + √ #recvP (2125ms) + √ #stopRecv (6044ms) +``` + +测试代码位于[$/test/performance.js](https://github.com/InCar/ali-mns/blob/master/test/performance.js) +一份测试输出日志示例位于 [$/test/performance.log](https://github.com/InCar/ali-mns/blob/master/test/performance.log) + +执行`npm run test`运行测试. + +设置环境变量 **DEBUG** 为 **ali-mns.test** 开启测试输出(会略微拖慢测试). + +# License +MIT diff --git a/gulpfile.js b/gulpfile.js new file mode 100644 index 0000000..b7f5243 --- /dev/null +++ b/gulpfile.js @@ -0,0 +1,26 @@ +var gulp = require('gulp'); +var newer = require('gulp-newer'); +var ts = require('gulp-typescript'); +var sourcemaps = require('gulp-sourcemaps'); +var del = require('del'); +var debug = require('gulp-debug'); + +var tsProject = ts.createProject('ts/tsconfig.json'); + +gulp.task('build', function(){ + var tsResult = tsProject.src() + .pipe(newer('index.js')) + .pipe(sourcemaps.init()) + .pipe(debug({ title: 'ts: ' })) + .pipe(ts(tsProject)); + return tsResult.js + .pipe(sourcemaps.write('.')) + .pipe(gulp.dest('.')) + .pipe(debug({ title: 'out: ' })); +}); + +gulp.task('clean', function(){ + del(['index.js', 'index.js.map']); +}); + +gulp.task('default', ['build']); \ No newline at end of file diff --git a/index.js b/index.js index 371b41b..f4f6035 100644 --- a/index.js +++ b/index.js @@ -1,654 +1,840 @@ -/// -// dependencies -var Buffer = require("buffer"); -var CryptoA = require("crypto"); -var Events = require("events"); -var Util = require("util"); -var Url = require("url"); -var debug = require("debug")("ali-mns"); -var Promise = require("promise"); -var Request = require("request"); -Request.requestP = Promise.denodeify(Request); -Request.debug = false; -var Xml2js = require("xml2js"); -Xml2js.parseStringP = Promise.denodeify(Xml2js.parseString); -var XmlBuilder = require("xmlbuilder"); -/// -var AliMNS; -(function (AliMNS) { - // The Ali account, it holds the key id and secret. - var Account = (function () { - function Account(accountId, keyId, keySecret) { - this._accountId = accountId; - this._keyId = keyId; - this._keySecret = keySecret; - } - Account.prototype.getAccountId = function () { return this._accountId; }; - Account.prototype.getOwnerId = function () { return this._accountId; }; // for compatible v1.x - Account.prototype.getKeyId = function () { return this._keyId; }; - // encoding: "hex", "binary" or "base64" - Account.prototype.hmac_sha1 = function (text, encoding) { - var hmacSHA1 = CryptoA.createHmac("sha1", this._keySecret); - return hmacSHA1.update(text).digest(encoding); - }; - Account.prototype.b64md5 = function (text) { - var cryptoMD5 = CryptoA.createHash("md5"); - var md5HEX = cryptoMD5.update(text).digest("hex"); - var buf = new Buffer.Buffer(md5HEX, "utf8"); - return buf.toString("base64"); - }; - return Account; - })(); - AliMNS.Account = Account; -})(AliMNS || (AliMNS = {})); -var AliMNS; -(function (AliMNS) { - // The Message class - var Msg = (function () { - function Msg(msg, priority, delaySeconds) { - // message priority - this._priority = 8; - // message delay to visible, in seconds - this._delaySeconds = 0; - this._msg = msg; - if (!isNaN(priority)) - this._priority = priority; - if (!isNaN(delaySeconds)) - this._delaySeconds = delaySeconds; - } - Msg.prototype.getMsg = function () { return this._msg; }; - Msg.prototype.getPriority = function () { return this._priority; }; - Msg.prototype.getDelaySeconds = function () { return this._delaySeconds; }; - return Msg; - })(); - AliMNS.Msg = Msg; -})(AliMNS || (AliMNS = {})); -/// -// The Ali open interface stack -/// -/// -var AliMNS; -(function (AliMNS) { - // the ali open interface stack protocol - var OpenStack = (function () { - function OpenStack(account) { - this._patternMNS = "MNS %s:%s"; - this._patternSign = "%s\n%s\n%s\n%s\n%s%s"; - this._contentType = "text/xml;charset=utf-8"; - this._version = "2015-06-06"; - this._account = account; - // xml builder - this._xmlBuilder = XmlBuilder; - } - // Send the request - // method: GET, POST, PUT, DELETE - // url: request url - // body: optional, request body - // head: optional, request heads - // options: optional, request options - OpenStack.prototype.sendP = function (method, url, body, headers, options) { - var req = { method: method, url: url }; - if (body) - req.body = this._xmlBuilder.create(body).toString(); - req.headers = this.makeHeaders(method, url, headers, req.body); - // combines options - if (options) { - for (var opt in options) { - if (opt === "method" || opt === "url" || opt === "uri" || opt === "body" || opt === "headers") - continue; // skip these options for avoid conflict to other arguments - else if (options.hasOwnProperty(opt)) - req[opt] = options[opt]; - } - } - return Request.requestP(req).then(function (response) { - // convert the body from xml to json - return Xml2js.parseStringP(response.body, { explicitArray: false }) - .then(function (bodyJSON) { - response.bodyJSON = bodyJSON; - return response; - }, function () { - // cannot parse as xml - response.bodyJSON = response.body; - return response; - }); - }).then(function (response) { - if (response.statusCode < 400) { - if (response.bodyJSON) - return response.bodyJSON; - else - return response.statusCode; - } - else { - if (response.bodyJSON) - return Promise.reject(response.bodyJSON); - else - return Promise.reject(response.statusCode); - } - }); - }; - OpenStack.prototype.makeHeaders = function (mothod, url, headers, body) { - // if not exist, create one - if (!headers) - headers = {}; - var contentMD5 = ""; - var contentType = ""; - if (body) { - if (!headers["Content-Length"]) - headers["Content-Length"] = body.length; - if (!headers["Content-Type"]) - headers["Content-Type"] = this._contentType; - contentType = headers["Content-Type"]; - contentMD5 = this._account.b64md5(body); - headers["Content-MD5"] = contentMD5; - } - // `Dat`e & `Host` will be added by request automatically - if (!headers["x-mns-version"]) - headers["x-mns-version"] = this._version; - // lowercase & sort & extract the x-mns- - var headsLower = {}; - var keys = []; - for (var key in headers) { - if (headers.hasOwnProperty(key)) { - var lower = key.toLowerCase(); - keys.push(lower); - headsLower[lower] = headers[key]; - } - } - keys.sort(); - var mnsHeaders = ""; - for (var i in keys) { - var k = keys[i]; - if (k.indexOf("x-mns-") === 0) { - mnsHeaders += Util.format("%s:%s\n", k, headsLower[k]); - } - } - var tm = (new Date()).toUTCString(); - var mnsURL = Url.parse(url); - headers.Date = tm; - headers.Authorization = this.authorize(mothod, mnsURL.path, mnsHeaders, contentType, contentMD5, tm); - headers.Host = mnsURL.host; - return headers; - }; - // ali mns authorize header - OpenStack.prototype.authorize = function (httpVerb, mnsURI, mnsHeaders, contentType, contentMD5, tm) { - return Util.format(this._patternMNS, this._account.getKeyId(), this.signature(httpVerb, mnsURI, mnsHeaders, contentType, contentMD5, tm)); - }; - // ali mns signature - OpenStack.prototype.signature = function (httpVerb, mnsURI, mnsHeaders, contentType, contentMD5, tm) { - var text = Util.format(this._patternSign, httpVerb, contentMD5, contentType, tm, mnsHeaders, mnsURI); - return this._account.hmac_sha1(text, "base64"); - }; - return OpenStack; - })(); - AliMNS.OpenStack = OpenStack; -})(AliMNS || (AliMNS = {})); -/// -/// -/// -var AliMNS; -(function (AliMNS) { - // The MNS can list, create, delete, modify the mq. - var MNS = (function () { - // The constructor. account: ali account; region: can be "hangzhou", "beijing" or "qingdao", default is "hangzhou" - function MNS(account, region) { - this._region = "hangzhou"; // region: hangzhou, beijing, qingdao - this._pattern = "http://%s.mns.cn-%s.aliyuncs.com/queues/"; - // save the input arguments - this._account = account; - if (region) - this._region = region; - // make url - this._url = this.makeURL(); - // create the OpenStack object - this._openStack = new AliMNS.OpenStack(account); - } - // List all mns. - MNS.prototype.listP = function (prefix, pageSize, pageMarker) { - var headers = {}; - if (prefix) - headers["x-mns-prefix"] = prefix; - if (pageMarker) - headers["x-mns-marker"] = pageMarker; - if (pageSize) - headers["x-mns-ret-number"] = pageSize; - var url = this._url.slice(0, -1); - debug("GET " + url); - return this._openStack.sendP("GET", url, null, headers); - }; - // Create a message queue - MNS.prototype.createP = function (name, options) { - var body = { Queue: "" }; - if (options) - body.Queue = options; - var url = Url.resolve(this._url, name); - debug("PUT " + url, body); - return this._openStack.sendP("PUT", url, body); - }; - // Delete a message queue - MNS.prototype.deleteP = function (name) { - var url = Url.resolve(this._url, name); - debug("DELETE " + url); - return this._openStack.sendP("DELETE", url); - }; - MNS.prototype.makeURL = function () { - return Util.format(this._pattern, this._account.getAccountId(), this._region); - }; - return MNS; - })(); - AliMNS.MNS = MNS; - // For compatible v1.x - AliMNS.MQS = MNS; -})(AliMNS || (AliMNS = {})); -/// -/// -var AliMNS; -(function (AliMNS) { - var NotifyRecv = (function () { - function NotifyRecv(mq) { - this._signalSTOP = true; - this._evStopped = "AliMNS_MQ_NOTIFY_STOPPED"; - // 连续timeout计数器 - // 在某种未知的原因下,网络底层链接断了 - // 这时在程序内部的重试无法促使网络重连,以后的重试都是徒劳的 - // 如果连续发生反复重试都依然timeout,那么极有可能已经发生此种情况了 - // 这时抛出NetworkBroken异常 - this._timeoutCount = 0; - this._timeoutMax = 128; - this._mq = mq; - // emitter - this._emitter = new Events.EventEmitter(); - } - // 消息通知.每当有消息收到时,都调用cb回调函数 - // 如果cb返回true,那么将删除消息,否则保留消息 - NotifyRecv.prototype.notifyRecv = function (cb, waitSeconds, numOfMessages) { - this._signalSTOP = false; - this._timeoutCount = 0; - this.notifyRecvInternal(cb, waitSeconds, numOfMessages); - }; - // 停止消息通知 - NotifyRecv.prototype.notifyStopP = function () { - var _this = this; - if (this._signalSTOP) - return Promise.resolve(this._evStopped); - this._signalSTOP = true; - return new Promise(function (resolve) { - _this._emitter.once(_this._evStopped, function () { - resolve(_this._evStopped); - }); - }); - }; - NotifyRecv.prototype.notifyRecvInternal = function (cb, waitSeconds, numOfMessages) { - var _this = this; - // This signal will be triggered by notifyStopP() - if (this._signalSTOP) { - debug("notifyStopped"); - this._emitter.emit(this._evStopped); - return; - } - debug("notifyRecvInternal()"); - try { - var mqBatch = this._mq; - mqBatch.recvP(waitSeconds, numOfMessages).done(function (dataRecv) { - try { - debug(dataRecv); - _this._timeoutCount = 0; - if (cb(null, dataRecv)) { - _this.deleteP(dataRecv) - .done(null, function (ex) { - console.log(ex); - }); - } - } - catch (ex) { - // ignore any ex throw from cb - console.warn(ex); - } - _this.notifyRecvInternal(cb, waitSeconds, numOfMessages); - }, function (ex) { - debug(ex); - if ((!ex.Error) || (ex.Error.Code !== "MessageNotExist")) { - cb(ex, null); - } - if (ex) { - if (ex.message === "timeout") { - _this._timeoutCount++; - if (_this._timeoutCount > _this._timeoutMax) { - // 极度可能网络底层断了 - cb(new Error("NetworkBroken"), null); - } - } - else if (ex.Error && ex.Error.Code === "MessageNotExist") { - _this._timeoutCount = 0; - } - } - process.nextTick(function () { - _this.notifyRecvInternal(cb, waitSeconds, numOfMessages); - }); - }); - } - catch (ex) { - // ignore any ex - console.warn(ex); - // 过5秒重试 - debug("Retry after 5 seconds"); - setTimeout(function () { - _this.notifyRecvInternal(cb, waitSeconds, numOfMessages); - }, 5000); - } - }; - NotifyRecv.prototype.deleteP = function (dataRecv) { - if (dataRecv) { - if (dataRecv.Message) { - return this._mq.deleteP(dataRecv.Message.ReceiptHandle); - } - else if (dataRecv.Messages && dataRecv.Messages.Message) { - var rhs = []; - for (var i = 0; i < dataRecv.Messages.Message.length; i++) { - rhs.push(dataRecv.Messages.Message[i].ReceiptHandle); - } - var mqBatch = this._mq; - return mqBatch.deleteP(rhs); - } - else { - return Promise.resolve(dataRecv); - } - } - else { - return Promise.resolve(dataRecv); - } - }; - return NotifyRecv; - })(); - AliMNS.NotifyRecv = NotifyRecv; -})(AliMNS || (AliMNS = {})); -/// -/// -/// -/// -var AliMNS; -(function (AliMNS) { - // The MQ - var MQ = (function () { - // The constructor. name & account is required. - // region can be "hangzhou", "beijing" or "qingdao", the default is "hangzhou" - function MQ(name, account, region) { - this._notifyRecv = null; - this._recvTolerance = 5; // 接收消息的容忍时间(单位:秒) - this._region = "hangzhou"; - this._pattern = "http://%s.mns.cn-%s.aliyuncs.com/queues/%s"; - this._name = name; - this._account = account; - if (region) - this._region = region; - // make url - this._urlAttr = this.makeAttrURL(); - this._url = this.makeURL(); - // create the OpenStack object - this._openStack = new AliMNS.OpenStack(account); - } - MQ.prototype.getName = function () { return this._name; }; - MQ.prototype.getAccount = function () { return this._account; }; - MQ.prototype.getRegion = function () { return this._region; }; - // 获取MQ的属性值 - MQ.prototype.getAttrsP = function () { - debug("GET " + this._urlAttr); - return this._openStack.sendP("GET", this._urlAttr); - }; - // 设置MQ的属性值 - MQ.prototype.setAttrsP = function (options) { - var body = { Queue: options }; - debug("PUT " + this._urlAttr, body); - return this._openStack.sendP("PUT", this._urlAttr + "?metaoverride=true", body); - }; - // 发送消息 - MQ.prototype.sendP = function (msg, priority, delaySeconds) { - var b64 = this.utf8ToBase64(msg); - var body = { Message: { MessageBody: b64 } }; - if (!isNaN(priority)) - body.Message.Priority = priority; - if (!isNaN(delaySeconds)) - body.Message.DelaySeconds = delaySeconds; - debug("POST " + this._url, body); - return this._openStack.sendP("POST", this._url, body); - }; - // 接收消息容忍时间(秒) - MQ.prototype.getRecvTolerance = function () { return this._recvTolerance; }; - MQ.prototype.setRecvTolerance = function (value) { this._recvTolerance = value; }; - // 接收消息 - // waitSeconds, 最久等待多少秒0~30 - MQ.prototype.recvP = function (waitSeconds) { - var _this = this; - var url = this._url; - if (waitSeconds) - url += "?waitseconds=" + waitSeconds; - debug("GET " + url); - return new Promise(function (resolve, reject) { - // use the timeout mechanism inside the request module - var options = { timeout: 1000 * _this._recvTolerance }; - if (waitSeconds) - options.timeout += (1000 * waitSeconds); - _this._openStack.sendP("GET", url, null, null, options).done(function (data) { - debug(data); - if (data && data.Message && data.Message.MessageBody) { - data.Message.MessageBody = _this.base64ToUtf8(data.Message.MessageBody); - } - resolve(data); - }, function (ex) { - // for compatible with 1.x, still use literal "timeout" - if (ex.code === "ETIMEDOUT") { - var exTimeout = new Error("timeout"); - exTimeout.innerException = ex; - exTimeout.code = ex.code; - reject(exTimeout); - } - else { - reject(ex); - } - }); - }); - }; - // 检查消息 - MQ.prototype.peekP = function () { - var _this = this; - var url = this._url + "?peekonly=true"; - debug("GET " + url); - return this._openStack.sendP("GET", url).then(function (data) { - debug(data); - _this.decodeB64Messages(data); - return data; - }); - }; - // 删除消息 - MQ.prototype.deleteP = function (receiptHandle) { - var url = this._url + "?ReceiptHandle=" + receiptHandle; - debug("DELETE " + url); - return this._openStack.sendP("DELETE", url); - }; - // 保留消息 - MQ.prototype.reserveP = function (receiptHandle, reserveSeconds) { - var url = this._url - + "?ReceiptHandle=" + receiptHandle - + "&VisibilityTimeout=" + reserveSeconds; - debug("PUT " + url); - return this._openStack.sendP("PUT", url); - }; - // 消息通知.每当有消息收到时,都调用cb回调函数 - // 如果cb返回true,那么将删除消息,否则保留消息 - MQ.prototype.notifyRecv = function (cb, waitSeconds) { - // lazy create - if (this._notifyRecv === null) - this._notifyRecv = new AliMNS.NotifyRecv(this); - return this._notifyRecv.notifyRecv(cb, waitSeconds || 5); - }; - // 停止消息通知 - MQ.prototype.notifyStopP = function () { - if (this._notifyRecv === null) - return Promise.resolve(0); - else - return this._notifyRecv.notifyStopP(); - }; - MQ.prototype.utf8ToBase64 = function (src) { - var buf = new Buffer.Buffer(src, 'utf8'); - return buf.toString('base64'); - }; - MQ.prototype.base64ToUtf8 = function (src) { - var buf = new Buffer.Buffer(src, 'base64'); - return buf.toString('utf8'); - }; - MQ.prototype.decodeB64Messages = function (data) { - if (data && data.Message && data.Message.MessageBody) { - data.Message.MessageBody = this.base64ToUtf8(data.Message.MessageBody); - } - }; - MQ.prototype.makeAttrURL = function () { - return Util.format(this._pattern, this._account.getAccountId(), this._region, this._name); - }; - MQ.prototype.makeURL = function () { - return this.makeAttrURL() + "/messages"; - }; - return MQ; - })(); - AliMNS.MQ = MQ; -})(AliMNS || (AliMNS = {})); -/// -/// -/// -var __extends = (this && this.__extends) || function (d, b) { - for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; - function __() { this.constructor = d; } - __.prototype = b.prototype; - d.prototype = new __(); -}; -var AliMNS; -(function (AliMNS) { - var MQBatch = (function (_super) { - __extends(MQBatch, _super); - function MQBatch(name, account, region) { - _super.call(this, name, account, region); - this._notifyRecv = null; - } - MQBatch.prototype.sendP = function (msg, priority, delaySeconds) { - if (typeof msg === "string") { - return _super.prototype.sendP.call(this, msg, priority, delaySeconds); - } - else { - var body = { Messages: { '#list': [] } }; - for (var i = 0; i < msg.length; i++) { - var m = msg[i]; - var b64 = this.utf8ToBase64(m.getMsg()); - var xMsg = { Message: { MessageBody: b64 } }; - xMsg.Message.Priority = m.getPriority(); - xMsg.Message.DelaySeconds = m.getDelaySeconds(); - body.Messages['#list'].push(xMsg); - } - debug("POST " + this._url, body); - return this._openStack.sendP("POST", this._url, body); - } - }; - MQBatch.prototype.recvP = function (waitSeconds, numOfMessages) { - if (numOfMessages === undefined) - numOfMessages = 16; - if (numOfMessages && numOfMessages > 1) { - var _this = this; - var url = this._url; - url += "?numOfMessages=" + numOfMessages; - if (waitSeconds) - url += "&waitseconds=" + waitSeconds; - debug("GET " + url); - return new Promise(function (resolve, reject) { - // use the timeout mechanism inside the request module - var options = { timeout: 1000 * _this._recvTolerance }; - if (waitSeconds) - options.timeout += (1000 * waitSeconds); - _this._openStack.sendP("GET", url, null, null, options).done(function (data) { - debug(data); - _this.decodeB64Messages(data); - resolve(data); - }, function (ex) { - // for compatible with 1.x, still use literal "timeout" - if (ex.code === "ETIMEDOUT") { - var exTimeout = new Error("timeout"); - exTimeout.innerException = ex; - exTimeout.code = ex.code; - reject(exTimeout); - } - else { - reject(ex); - } - }); - }); - } - else { - return _super.prototype.recvP.call(this, waitSeconds); - } - }; - MQBatch.prototype.peekP = function (numOfMessages) { - if (numOfMessages === undefined) - numOfMessages = 16; - if (numOfMessages && numOfMessages > 1) { - var _this = this; - var url = this._url + "?peekonly=true"; - url += "&numOfMessages=" + numOfMessages; - debug("GET " + url); - return this._openStack.sendP("GET", url).then(function (data) { - debug(data); - _this.decodeB64Messages(data); - return data; - }); - } - else { - return _super.prototype.peekP.call(this); - } - }; - MQBatch.prototype.deleteP = function (receiptHandle) { - if (typeof receiptHandle === "string") { - _super.prototype.deleteP.call(this, receiptHandle); - } - else { - debug("DELETE " + this._url, receiptHandle); - var body = { ReceiptHandles: { '#list': [] } }; - for (var i = 0; i < receiptHandle.length; i++) { - var r = { ReceiptHandle: receiptHandle[i] }; - body.ReceiptHandles['#list'].push(r); - } - return this._openStack.sendP("DELETE", this._url, body); - } - }; - // 消息通知.每当有消息收到时,都调用cb回调函数 - // 如果cb返回true,那么将删除消息,否则保留消息 - MQBatch.prototype.notifyRecv = function (cb, waitSeconds, numOfMessages) { - // lazy create - if (this._notifyRecv === null) - this._notifyRecv = new AliMNS.NotifyRecv(this); - return this._notifyRecv.notifyRecv(cb, waitSeconds || 5, numOfMessages || 16); - }; - MQBatch.prototype.decodeB64Messages = function (data) { - if (data && data.Messages && data.Messages.Message) { - if (!Util.isArray(data.Messages.Message)) { - // Just a single message, use an array to hold it - var msg = data.Messages.Message; - data.Messages.Message = [msg]; - } - for (var i = 0; i < data.Messages.Message.length; i++) { - var msg = data.Messages.Message[i]; - if (msg.MessageBody) - msg.MessageBody = this.base64ToUtf8(msg.MessageBody); - } - } - else { - _super.prototype.decodeB64Messages.call(this, data); - } - }; - return MQBatch; - })(AliMNS.MQ); - AliMNS.MQBatch = MQBatch; -})(AliMNS || (AliMNS = {})); -/// -/// -/// -/// -/// -/// -// Exports the AliMNS -module.exports = AliMNS; -//# sourceMappingURL=index.js.map \ No newline at end of file +var __extends = (this && this.__extends) || function (d, b) { + for (var p in b) if (b.hasOwnProperty(p)) d[p] = b[p]; + function __() { this.constructor = d; } + d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __()); +}; +/// +// dependencies +var Buffer = require("buffer"); +var CryptoA = require("crypto"); +var Events = require("events"); +var Util = require("util"); +var Url = require("url"); +var debug = require("debug")("ali-mns"); +var Promise = require("promise"); +var Request = require("request"); +Request.requestP = Promise.denodeify(Request); +Request.debug = false; +var Xml2js = require("xml2js"); +Xml2js.parseStringP = Promise.denodeify(Xml2js.parseString); +var XmlBuilder = require("xmlbuilder"); +/// +var AliMNS; +(function (AliMNS) { + // The Ali account, it holds the key id and secret. + var Account = (function () { + function Account(accountId, keyId, keySecret) { + this._accountId = accountId; + this._keyId = keyId; + this._keySecret = keySecret; + } + Account.prototype.getAccountId = function () { return this._accountId; }; + Account.prototype.getOwnerId = function () { return this._accountId; }; // for compatible v1.x + Account.prototype.getKeyId = function () { return this._keyId; }; + // encoding: "hex", "binary" or "base64" + Account.prototype.hmac_sha1 = function (text, encoding) { + var hmacSHA1 = CryptoA.createHmac("sha1", this._keySecret); + return hmacSHA1.update(text).digest(encoding); + }; + Account.prototype.b64md5 = function (text) { + var cryptoMD5 = CryptoA.createHash("md5"); + var md5HEX = cryptoMD5.update(text).digest("hex"); + var buf = new Buffer.Buffer(md5HEX, "utf8"); + return buf.toString("base64"); + }; + return Account; + }()); + AliMNS.Account = Account; +})(AliMNS || (AliMNS = {})); +var AliMNS; +(function (AliMNS) { + // The Message class + var Msg = (function () { + function Msg(msg, priority, delaySeconds) { + // message priority + this._priority = 8; + // message delay to visible, in seconds + this._delaySeconds = 0; + this._msg = msg; + if (!isNaN(priority)) + this._priority = priority; + if (!isNaN(delaySeconds)) + this._delaySeconds = delaySeconds; + } + Msg.prototype.getMsg = function () { return this._msg; }; + Msg.prototype.getPriority = function () { return this._priority; }; + Msg.prototype.getDelaySeconds = function () { return this._delaySeconds; }; + return Msg; + }()); + AliMNS.Msg = Msg; +})(AliMNS || (AliMNS = {})); +/// +// The Ali open interface stack +/// +/// +var AliMNS; +(function (AliMNS) { + // the ali open interface stack protocol + var OpenStack = (function () { + function OpenStack(account) { + this._patternMNS = "MNS %s:%s"; + this._patternSign = "%s\n%s\n%s\n%s\n%s%s"; + this._contentType = "text/xml;charset=utf-8"; + this._version = "2015-06-06"; + this._account = account; + // xml builder + this._xmlBuilder = XmlBuilder; + } + // Send the request + // method: GET, POST, PUT, DELETE + // url: request url + // body: optional, request body + // head: optional, request heads + // options: optional, request options + OpenStack.prototype.sendP = function (method, url, body, headers, options) { + var req = { method: method, url: url }; + if (body) + req.body = this._xmlBuilder.create(body).toString(); + req.headers = this.makeHeaders(method, url, headers, req.body); + // combines options + if (options) { + for (var opt in options) { + if (opt === "method" || opt === "url" || opt === "uri" || opt === "body" || opt === "headers") + continue; // skip these options for avoid conflict to other arguments + else if (options.hasOwnProperty(opt)) + req[opt] = options[opt]; + } + } + return Request.requestP(req).then(function (response) { + // convert the body from xml to json + return Xml2js.parseStringP(response.body, { explicitArray: false }) + .then(function (bodyJSON) { + response.bodyJSON = bodyJSON; + return response; + }, function () { + // cannot parse as xml + response.bodyJSON = response.body; + return response; + }); + }).then(function (response) { + if (response.statusCode < 400) { + if (response.bodyJSON) + return response.bodyJSON; + else + return response.statusCode; + } + else { + if (response.bodyJSON) + return Promise.reject(response.bodyJSON); + else + return Promise.reject(response.statusCode); + } + }); + }; + OpenStack.prototype.makeHeaders = function (mothod, url, headers, body) { + // if not exist, create one + if (!headers) + headers = {}; + var contentMD5 = ""; + var contentType = ""; + if (body) { + if (!headers["Content-Length"]) + headers["Content-Length"] = body.length; + if (!headers["Content-Type"]) + headers["Content-Type"] = this._contentType; + contentType = headers["Content-Type"]; + contentMD5 = this._account.b64md5(body); + headers["Content-MD5"] = contentMD5; + } + // `Date` & `Host` will be added by request automatically + if (!headers["x-mns-version"]) + headers["x-mns-version"] = this._version; + // lowercase & sort & extract the x-mns- + var headsLower = {}; + var keys = []; + for (var key in headers) { + if (headers.hasOwnProperty(key)) { + var lower = key.toLowerCase(); + keys.push(lower); + headsLower[lower] = headers[key]; + } + } + keys.sort(); + var mnsHeaders = ""; + for (var i in keys) { + var k = keys[i]; + if (k.indexOf("x-mns-") === 0) { + mnsHeaders += Util.format("%s:%s\n", k, headsLower[k]); + } + } + var tm = (new Date()).toUTCString(); + var mnsURL = Url.parse(url); + headers.Date = tm; + headers.Authorization = this.authorize(mothod, mnsURL.path, mnsHeaders, contentType, contentMD5, tm); + headers.Host = mnsURL.host; + return headers; + }; + // ali mns authorize header + OpenStack.prototype.authorize = function (httpVerb, mnsURI, mnsHeaders, contentType, contentMD5, tm) { + return Util.format(this._patternMNS, this._account.getKeyId(), this.signature(httpVerb, mnsURI, mnsHeaders, contentType, contentMD5, tm)); + }; + // ali mns signature + OpenStack.prototype.signature = function (httpVerb, mnsURI, mnsHeaders, contentType, contentMD5, tm) { + var text = Util.format(this._patternSign, httpVerb, contentMD5, contentType, tm, mnsHeaders, mnsURI); + return this._account.hmac_sha1(text, "base64"); + }; + return OpenStack; + }()); + AliMNS.OpenStack = OpenStack; +})(AliMNS || (AliMNS = {})); +/// +/// +/// +var AliMNS; +(function (AliMNS) { + // The MNS can list, create, delete, modify the mq. + var MNS = (function () { + // The constructor. account: ali account; region: can be "hangzhou", "beijing" or "qingdao", default is "hangzhou" + function MNS(account, region) { + this._region = "hangzhou"; // region: hangzhou, beijing, qingdao + this._pattern = "http://%s.mns.cn-%s.aliyuncs.com/queues/"; + // save the input arguments + this._account = account; + if (region) + this._region = region; + // make url + this._url = this.makeURL(); + // create the OpenStack object + this._openStack = new AliMNS.OpenStack(account); + } + // List all mns. + MNS.prototype.listP = function (prefix, pageSize, pageMarker) { + var headers = {}; + if (prefix) + headers["x-mns-prefix"] = prefix; + if (pageMarker) + headers["x-mns-marker"] = pageMarker; + if (pageSize) + headers["x-mns-ret-number"] = pageSize; + var url = this._url.slice(0, -1); + debug("GET " + url); + return this._openStack.sendP("GET", url, null, headers); + }; + // Create a message queue + MNS.prototype.createP = function (name, options) { + var body = { Queue: "" }; + if (options) + body.Queue = options; + var url = Url.resolve(this._url, name); + debug("PUT " + url, body); + return this._openStack.sendP("PUT", url, body); + }; + // Delete a message queue + MNS.prototype.deleteP = function (name) { + var url = Url.resolve(this._url, name); + debug("DELETE " + url); + return this._openStack.sendP("DELETE", url); + }; + MNS.prototype.makeURL = function () { + return Util.format(this._pattern, this._account.getAccountId(), this._region); + }; + return MNS; + }()); + AliMNS.MNS = MNS; + // For compatible v1.x + AliMNS.MQS = MNS; +})(AliMNS || (AliMNS = {})); +/// +/// +var AliMNS; +(function (AliMNS) { + var NotifyRecv = (function () { + function NotifyRecv(mq) { + this._signalSTOP = true; + this._evStopped = "AliMNS_MQ_NOTIFY_STOPPED"; + // 连续timeout计数器 + // 在某种未知的原因下,网络底层链接断了 + // 这时在程序内部的重试无法促使网络重连,以后的重试都是徒劳的 + // 如果连续发生反复重试都依然timeout,那么极有可能已经发生此种情况了 + // 这时抛出NetworkBroken异常 + this._timeoutCount = 0; + this._timeoutMax = 128; + this._mq = mq; + // emitter + this._emitter = new Events.EventEmitter(); + } + // 消息通知.每当有消息收到时,都调用cb回调函数 + // 如果cb返回true,那么将删除消息,否则保留消息 + NotifyRecv.prototype.notifyRecv = function (cb, waitSeconds, numOfMessages) { + this._signalSTOP = false; + this._timeoutCount = 0; + this.notifyRecvInternal(cb, waitSeconds, numOfMessages); + }; + // 停止消息通知 + NotifyRecv.prototype.notifyStopP = function () { + var _this = this; + if (this._signalSTOP) + return Promise.resolve(this._evStopped); + this._signalSTOP = true; + return new Promise(function (resolve) { + _this._emitter.once(_this._evStopped, function () { + resolve(_this._evStopped); + }); + }); + }; + NotifyRecv.prototype.notifyRecvInternal = function (cb, waitSeconds, numOfMessages) { + var _this = this; + // This signal will be triggered by notifyStopP() + if (this._signalSTOP) { + debug("notifyStopped"); + this._emitter.emit(this._evStopped); + return; + } + debug("notifyRecvInternal()"); + try { + var mqBatch = this._mq; + mqBatch.recvP(waitSeconds, numOfMessages).done(function (dataRecv) { + try { + debug(dataRecv); + _this._timeoutCount = 0; + if (cb(null, dataRecv)) { + _this.deleteP(dataRecv) + .done(null, function (ex) { + console.log(ex); + }); + } + } + catch (ex) { + // ignore any ex throw from cb + console.warn(ex); + } + _this.notifyRecvInternal(cb, waitSeconds, numOfMessages); + }, function (ex) { + debug(ex); + if ((!ex.Error) || (ex.Error.Code !== "MessageNotExist")) { + cb(ex, null); + } + if (ex) { + if (ex.message === "timeout") { + _this._timeoutCount++; + if (_this._timeoutCount > _this._timeoutMax) { + // 极度可能网络底层断了 + cb(new Error("NetworkBroken"), null); + } + } + else if (ex.Error && ex.Error.Code === "MessageNotExist") { + _this._timeoutCount = 0; + } + } + process.nextTick(function () { + _this.notifyRecvInternal(cb, waitSeconds, numOfMessages); + }); + }); + } + catch (ex) { + // ignore any ex + console.warn(ex); + // 过5秒重试 + debug("Retry after 5 seconds"); + setTimeout(function () { + _this.notifyRecvInternal(cb, waitSeconds, numOfMessages); + }, 5000); + } + }; + NotifyRecv.prototype.deleteP = function (dataRecv) { + if (dataRecv) { + if (dataRecv.Message) { + return this._mq.deleteP(dataRecv.Message.ReceiptHandle); + } + else if (dataRecv.Messages && dataRecv.Messages.Message) { + var rhs = []; + for (var i = 0; i < dataRecv.Messages.Message.length; i++) { + rhs.push(dataRecv.Messages.Message[i].ReceiptHandle); + } + var mqBatch = this._mq; + return mqBatch.deleteP(rhs); + } + else { + return Promise.resolve(dataRecv); + } + } + else { + return Promise.resolve(dataRecv); + } + }; + return NotifyRecv; + }()); + AliMNS.NotifyRecv = NotifyRecv; +})(AliMNS || (AliMNS = {})); +/// +/// +/// +/// +var AliMNS; +(function (AliMNS) { + // The MQ + var MQ = (function () { + // The constructor. name & account is required. + // region can be "hangzhou", "beijing" or "qingdao", the default is "hangzhou" + function MQ(name, account, region) { + this._notifyRecv = null; + this._recvTolerance = 5; // 接收消息的容忍时间(单位:秒) + this._region = "hangzhou"; + this._pattern = "http://%s.mns.cn-%s.aliyuncs.com/queues/%s"; + this._name = name; + this._account = account; + if (region) + this._region = region; + // make url + this._urlAttr = this.makeAttrURL(); + this._url = this.makeURL(); + // create the OpenStack object + this._openStack = new AliMNS.OpenStack(account); + } + MQ.prototype.getName = function () { return this._name; }; + MQ.prototype.getAccount = function () { return this._account; }; + MQ.prototype.getRegion = function () { return this._region; }; + // 获取MQ的属性值 + MQ.prototype.getAttrsP = function () { + debug("GET " + this._urlAttr); + return this._openStack.sendP("GET", this._urlAttr); + }; + // 设置MQ的属性值 + MQ.prototype.setAttrsP = function (options) { + var body = { Queue: options }; + debug("PUT " + this._urlAttr, body); + return this._openStack.sendP("PUT", this._urlAttr + "?metaoverride=true", body); + }; + // 发送消息 + MQ.prototype.sendP = function (msg, priority, delaySeconds) { + var b64 = this.utf8ToBase64(msg); + var body = { Message: { MessageBody: b64 } }; + if (!isNaN(priority)) + body.Message.Priority = priority; + if (!isNaN(delaySeconds)) + body.Message.DelaySeconds = delaySeconds; + debug("POST " + this._url, body); + return this._openStack.sendP("POST", this._url, body); + }; + // 接收消息容忍时间(秒) + MQ.prototype.getRecvTolerance = function () { return this._recvTolerance; }; + MQ.prototype.setRecvTolerance = function (value) { this._recvTolerance = value; }; + // 接收消息 + // waitSeconds, 最久等待多少秒0~30 + MQ.prototype.recvP = function (waitSeconds) { + var _this = this; + var url = this._url; + if (waitSeconds) + url += "?waitseconds=" + waitSeconds; + debug("GET " + url); + return new Promise(function (resolve, reject) { + // use the timeout mechanism inside the request module + var options = { timeout: 1000 * _this._recvTolerance }; + if (waitSeconds) + options.timeout += (1000 * waitSeconds); + _this._openStack.sendP("GET", url, null, null, options).done(function (data) { + debug(data); + if (data && data.Message && data.Message.MessageBody) { + data.Message.MessageBody = _this.base64ToUtf8(data.Message.MessageBody); + } + resolve(data); + }, function (ex) { + // for compatible with 1.x, still use literal "timeout" + if (ex.code === "ETIMEDOUT") { + var exTimeout = new Error("timeout"); + exTimeout.innerException = ex; + exTimeout.code = ex.code; + reject(exTimeout); + } + else { + reject(ex); + } + }); + }); + }; + // 检查消息 + MQ.prototype.peekP = function () { + var _this = this; + var url = this._url + "?peekonly=true"; + debug("GET " + url); + return this._openStack.sendP("GET", url).then(function (data) { + debug(data); + _this.decodeB64Messages(data); + return data; + }); + }; + // 删除消息 + MQ.prototype.deleteP = function (receiptHandle) { + var url = this._url + "?ReceiptHandle=" + receiptHandle; + debug("DELETE " + url); + return this._openStack.sendP("DELETE", url); + }; + // 保留消息 + MQ.prototype.reserveP = function (receiptHandle, reserveSeconds) { + var url = this._url + + "?ReceiptHandle=" + receiptHandle + + "&VisibilityTimeout=" + reserveSeconds; + debug("PUT " + url); + return this._openStack.sendP("PUT", url); + }; + // 消息通知.每当有消息收到时,都调用cb回调函数 + // 如果cb返回true,那么将删除消息,否则保留消息 + MQ.prototype.notifyRecv = function (cb, waitSeconds) { + // lazy create + if (this._notifyRecv === null) + this._notifyRecv = new AliMNS.NotifyRecv(this); + return this._notifyRecv.notifyRecv(cb, waitSeconds || 5); + }; + // 停止消息通知 + MQ.prototype.notifyStopP = function () { + if (this._notifyRecv === null) + return Promise.resolve(0); + else + return this._notifyRecv.notifyStopP(); + }; + MQ.prototype.utf8ToBase64 = function (src) { + var buf = new Buffer.Buffer(src, 'utf8'); + return buf.toString('base64'); + }; + MQ.prototype.base64ToUtf8 = function (src) { + var buf = new Buffer.Buffer(src, 'base64'); + return buf.toString('utf8'); + }; + MQ.prototype.decodeB64Messages = function (data) { + if (data && data.Message && data.Message.MessageBody) { + data.Message.MessageBody = this.base64ToUtf8(data.Message.MessageBody); + } + }; + MQ.prototype.makeAttrURL = function () { + return Util.format(this._pattern, this._account.getAccountId(), this._region, this._name); + }; + MQ.prototype.makeURL = function () { + return this.makeAttrURL() + "/messages"; + }; + return MQ; + }()); + AliMNS.MQ = MQ; +})(AliMNS || (AliMNS = {})); +/// +/// +/// +var AliMNS; +(function (AliMNS) { + var MQBatch = (function (_super) { + __extends(MQBatch, _super); + function MQBatch(name, account, region) { + _super.call(this, name, account, region); + this._notifyRecv = null; + } + MQBatch.prototype.sendP = function (msg, priority, delaySeconds) { + if (typeof msg === "string") { + return _super.prototype.sendP.call(this, msg, priority, delaySeconds); + } + else { + var body = { Messages: { '#list': [] } }; + for (var i = 0; i < msg.length; i++) { + var m = msg[i]; + var b64 = this.utf8ToBase64(m.getMsg()); + var xMsg = { Message: { MessageBody: b64 } }; + xMsg.Message.Priority = m.getPriority(); + xMsg.Message.DelaySeconds = m.getDelaySeconds(); + body.Messages['#list'].push(xMsg); + } + debug("POST " + this._url, body); + return this._openStack.sendP("POST", this._url, body); + } + }; + MQBatch.prototype.recvP = function (waitSeconds, numOfMessages) { + if (numOfMessages === undefined) + numOfMessages = 16; + if (numOfMessages && numOfMessages > 1) { + var _this = this; + var url = this._url; + url += "?numOfMessages=" + numOfMessages; + if (waitSeconds) + url += "&waitseconds=" + waitSeconds; + debug("GET " + url); + return new Promise(function (resolve, reject) { + // use the timeout mechanism inside the request module + var options = { timeout: 1000 * _this._recvTolerance }; + if (waitSeconds) + options.timeout += (1000 * waitSeconds); + _this._openStack.sendP("GET", url, null, null, options).done(function (data) { + debug(data); + _this.decodeB64Messages(data); + resolve(data); + }, function (ex) { + // for compatible with 1.x, still use literal "timeout" + if (ex.code === "ETIMEDOUT") { + var exTimeout = new Error("timeout"); + exTimeout.innerException = ex; + exTimeout.code = ex.code; + reject(exTimeout); + } + else { + reject(ex); + } + }); + }); + } + else { + return _super.prototype.recvP.call(this, waitSeconds); + } + }; + MQBatch.prototype.peekP = function (numOfMessages) { + if (numOfMessages === undefined) + numOfMessages = 16; + if (numOfMessages && numOfMessages > 1) { + var _this = this; + var url = this._url + "?peekonly=true"; + url += "&numOfMessages=" + numOfMessages; + debug("GET " + url); + return this._openStack.sendP("GET", url).then(function (data) { + debug(data); + _this.decodeB64Messages(data); + return data; + }); + } + else { + return _super.prototype.peekP.call(this); + } + }; + MQBatch.prototype.deleteP = function (receiptHandle) { + if (typeof receiptHandle === "string") { + _super.prototype.deleteP.call(this, receiptHandle); + } + else { + debug("DELETE " + this._url, receiptHandle); + var body = { ReceiptHandles: { '#list': [] } }; + for (var i = 0; i < receiptHandle.length; i++) { + var r = { ReceiptHandle: receiptHandle[i] }; + body.ReceiptHandles['#list'].push(r); + } + return this._openStack.sendP("DELETE", this._url, body); + } + }; + // 消息通知.每当有消息收到时,都调用cb回调函数 + // 如果cb返回true,那么将删除消息,否则保留消息 + MQBatch.prototype.notifyRecv = function (cb, waitSeconds, numOfMessages) { + // lazy create + if (this._notifyRecv === null) + this._notifyRecv = new AliMNS.NotifyRecv(this); + return this._notifyRecv.notifyRecv(cb, waitSeconds || 5, numOfMessages || 16); + }; + MQBatch.prototype.decodeB64Messages = function (data) { + if (data && data.Messages && data.Messages.Message) { + if (!Util.isArray(data.Messages.Message)) { + // Just a single message, use an array to hold it + var msg = data.Messages.Message; + data.Messages.Message = [msg]; + } + for (var i = 0; i < data.Messages.Message.length; i++) { + var msg = data.Messages.Message[i]; + if (msg.MessageBody) + msg.MessageBody = this.base64ToUtf8(msg.MessageBody); + } + } + else { + _super.prototype.decodeB64Messages.call(this, data); + } + }; + return MQBatch; + }(AliMNS.MQ)); + AliMNS.MQBatch = MQBatch; +})(AliMNS || (AliMNS = {})); +/// +/// +/// +/// +/// +/// +// Exports the AliMNS +module.exports = AliMNS; +/// +var AliMNS; +(function (AliMNS) { + var MNSTopic = (function (_super) { + __extends(MNSTopic, _super); + function MNSTopic(account, region) { + _super.call(this, account, region); + this._patternTopic = "http://%s.mns.cn-%s.aliyuncs.com/topics/"; + // make url + this._urlTopic = this.makeTopicURL(); + } + // List all topics. + MNSTopic.prototype.listTopicP = function (prefix, pageSize, pageMarker) { + var headers = {}; + if (prefix) + headers["x-mns-prefix"] = prefix; + if (pageMarker) + headers["x-mns-marker"] = pageMarker; + if (pageSize) + headers["x-mns-ret-number"] = pageSize; + var url = this._urlTopic.slice(0, -1); + debug("GET " + url); + return this._openStack.sendP("GET", url, null, headers); + }; + // Create a topic + MNSTopic.prototype.createTopicP = function (name, options) { + var body = { Topic: "" }; + if (options) + body.Topic = options; + var url = Url.resolve(this._urlTopic, name); + debug("PUT " + url, body); + return this._openStack.sendP("PUT", url, body); + }; + // Delete a topic + MNSTopic.prototype.deleteTopicP = function (name) { + var url = Url.resolve(this._urlTopic, name); + debug("DELETE " + url); + return this._openStack.sendP("DELETE", url); + }; + MNSTopic.prototype.makeTopicURL = function () { + return Util.format(this._patternTopic, this._account.getAccountId(), this._region); + }; + return MNSTopic; + }(AliMNS.MNS)); + AliMNS.MNSTopic = MNSTopic; +})(AliMNS || (AliMNS = {})); +/// +/// +/// +var AliMNS; +(function (AliMNS) { + // The Topic + var Topic = (function () { + // The constructor. name & account is required. + // region can be "hangzhou", "beijing" or "qingdao", the default is "hangzhou" + function Topic(name, account, region) { + this._region = "hangzhou"; + this._pattern = "http://%s.mns.cn-%s.aliyuncs.com/topics/%s"; + this._name = name; + this._account = account; + if (region) + this._region = region; + // make url + this._urlAttr = this.makeAttrURL(); + this._urlSubscription = this.makeSubscriptionURL(); + this._urlPublish = this.makePublishURL(); + // create the OpenStack object + this._openStack = new AliMNS.OpenStack(account); + } + Topic.prototype.getName = function () { return this._name; }; + Topic.prototype.getAccount = function () { return this._account; }; + Topic.prototype.getRegion = function () { return this._region; }; + // 获取Topic的属性值 + Topic.prototype.getAttrsP = function () { + debug("GET " + this._urlAttr); + return this._openStack.sendP("GET", this._urlAttr); + }; + // 设置Topic的属性值 + Topic.prototype.setAttrsP = function (options) { + var body = { Topic: options }; + debug("PUT " + this._urlAttr, body); + return this._openStack.sendP("PUT", this._urlAttr + "?metaoverride=true", body); + }; + // List all subscriptions. + Topic.prototype.listP = function (prefix, pageSize, pageMarker) { + var headers = {}; + if (prefix) + headers["x-mns-prefix"] = prefix; + if (pageMarker) + headers["x-mns-marker"] = pageMarker; + if (pageSize) + headers["x-mns-ret-number"] = pageSize; + var url = this._urlSubscription.slice(0, -1); + debug("GET " + url); + return this._openStack.sendP("GET", url, null, headers); + }; + Topic.prototype.subscribeP = function (name, endPoint, notifyStrategy, notifyContentFormat) { + var body = { + Subscription: { + Endpoint: endPoint + } + }; + if (notifyStrategy) + body.Subscription['NotifyStrategy'] = notifyStrategy; + if (notifyContentFormat) + body.Subscription['NotifyContentFormat'] = notifyContentFormat; + var url = Url.resolve(this._urlSubscription, name); + debug("PUT " + url, body); + return this._openStack.sendP("PUT", url, body); + }; + Topic.prototype.unsubscribeP = function (name) { + var url = Url.resolve(this._urlSubscription, name); + debug("DELETE " + url); + return this._openStack.sendP("DELETE", url); + }; + Topic.prototype.publishP = function (msg, b64) { + var body = { + Message: { + MessageBody: b64 ? this.utf8ToBase64(msg) : msg + } + }; + debug("POST " + this._urlPublish, body); + return this._openStack.sendP("POST", this._urlPublish, body); + }; + Topic.prototype.utf8ToBase64 = function (src) { + var buf = new Buffer.Buffer(src, 'utf8'); + return buf.toString('base64'); + }; + Topic.prototype.makeAttrURL = function () { + return Util.format(this._pattern, this._account.getAccountId(), this._region, this._name); + }; + Topic.prototype.makeSubscriptionURL = function () { + return this.makeAttrURL() + "/subscriptions/"; + }; + Topic.prototype.makePublishURL = function () { + return this.makeAttrURL() + "/messages"; + }; + return Topic; + }()); + AliMNS.Topic = Topic; +})(AliMNS || (AliMNS = {})); +/// +/// +/// +var AliMNS; +(function (AliMNS) { + // The Subscription + var Subscription = (function () { + // The constructor. name & topic is required. + function Subscription(name, topic) { + this._pattern = "http://%s.mns.cn-%s.aliyuncs.com/topics/%s/subscriptions/%s"; + this._name = name; + this._topic = topic; + // make url + this._urlAttr = this.makeAttrURL(); + // create the OpenStack object + this._openStack = new AliMNS.OpenStack(topic.getAccount()); + } + Subscription.prototype.getName = function () { return this._name; }; + Subscription.prototype.getTopic = function () { return this._topic; }; + // 获取Subscription的属性值 + Subscription.prototype.getAttrsP = function () { + debug("GET " + this._urlAttr); + return this._openStack.sendP("GET", this._urlAttr); + }; + // 设置Subscription的属性值 + Subscription.prototype.setAttrsP = function (options) { + var body = { Subscription: options }; + debug("PUT " + this._urlAttr, body); + return this._openStack.sendP("PUT", this._urlAttr + "?metaoverride=true", body); + }; + Subscription.prototype.makeAttrURL = function () { + return Util.format(this._pattern, this._topic.getAccount().getAccountId(), this._topic.getRegion(), this._topic.getName(), this._name); + }; + Subscription.NotifyStrategy = { + BACKOFF_RETRY: "BACKOFF_RETRY", + EXPONENTIAL_DECAY_RETRY: "EXPONENTIAL_DECAY_RETRY" + }; + Subscription.NotifyContentFormat = { + XML: "XML", + SIMPLIFIED: "SIMPLIFIED" + }; + return Subscription; + }()); + AliMNS.Subscription = Subscription; +})(AliMNS || (AliMNS = {})); + +//# sourceMappingURL=index.js.map diff --git a/package.json b/package.json index ec8dde8..ae13682 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ali-mns", - "version": "2.2.3", + "version": "2.3.0", "description": "The nodejs SDK for aliyun mns service", "main": "index.js", "dependencies": { @@ -11,16 +11,18 @@ "xmlbuilder": "^3.1.0" }, "devDependencies": { - "grunt": "^0.4.5", - "grunt-contrib-clean": "^0.6.0", - "grunt-newer": "^1.1.1", - "grunt-typescript": "^0.7.0", - "load-grunt-tasks": "^3.3.0", + "del": "^2.2.0", + "gulp": "^3.9.1", + "gulp-debug": "^2.1.2", + "gulp-newer": "^1.1.0", + "gulp-sourcemaps": "^1.6.0", + "gulp-typescript": "^2.12.2", + "mocha": "^2.4.5", "should": "^7.1.1" }, "scripts": { - "prepublish": "grunt", - "test": "mocha" + "prepublish": "gulp", + "test": "node node_modules/mocha/bin/mocha" }, "repository": { "type": "git", diff --git a/test/performance.js b/test/performance.js index ea26db2..390a04b 100644 --- a/test/performance.js +++ b/test/performance.js @@ -7,7 +7,7 @@ var Promise = require("promise"); var AliMNS = require(Path.join(__dirname, "../index.js")); var debugTest = require("debug")("ali-mns.test"); -describe('AliMNS-performance', function(){ +describe.skip('AliMNS-performance', function(){ this.timeout(1000 * 30); // ali account configuration var aliCfg = { diff --git a/test/topic.js b/test/topic.js new file mode 100644 index 0000000..58a4a89 --- /dev/null +++ b/test/topic.js @@ -0,0 +1,178 @@ +// mocha test + +var assert = require("assert"); +var Path = require("path"); +var fs = require("fs"); +var http = require("http"); +var Promise = require("promise"); +var AliMNS = require(Path.join(__dirname, "../index.js")); + +describe('AliMNS-topic', ()=>{ + // ali account configuration + var aliCfg = { + accountId: "your-account-id", + keyId: "your-key-id", + keySecret: "your-key-secret", + region: "hangzhou", + topicName: "dev", + endPoint: "https://www.baidu.com/ali-mns-ep", + port: 80 + }; + + // test/account.js contains sensitive data, and will not be tracked by git + var cfgPath = Path.join(__dirname, "account.js"); + if(fs.existsSync(cfgPath)){ + aliCfg = require(cfgPath); + // Topic截至2016年4月时仅有深圳可用 + aliCfg.region = "shenzhen"; + } + var account = new AliMNS.Account(aliCfg.accountId, aliCfg.keyId, aliCfg.keySecret); + var mns = new AliMNS.MNSTopic(account, aliCfg.region); + + describe('Topic', function(){ + this.timeout(1000 * 5); + + var topicName = aliCfg.topicName + Math.floor(Math.random() * 10000); + var subName = topicName + '-sub' + Math.floor(Math.random() * 10000); + var topic = new AliMNS.Topic(topicName, account, aliCfg.region); + var subscription = new AliMNS.Subscription(subName, topic); + + it('#createTopicP', (done)=>{ + mns.createTopicP(topicName, { + MaximumMessageSize: 65536, + LoggingEnabled: false + }).then((data)=>{ done(); }, done); + }); + + it('#listTopicP', (done)=>{ + mns.listTopicP(topicName, 1).then((data)=>{ + // console.info(data.Topics.Topic); + done(); }, done); + }); + + it('#setAttrsP & #getAttrsP', (done)=>{ + var testSource = 1024; + + topic.setAttrsP({ MaximumMessageSize: testSource }) + .then((dataSet)=>{ + // console.info(dataSet); + return topic.getAttrsP(); + }) + .then((dataGet)=>{ + // console.info(dataGet); + assert.equal(dataGet.Topic.MaximumMessageSize, testSource); + }) + .then(()=>{ done(); }, done); + }); + + it('#subscribe', (done)=>{ + topic.subscribeP(subName, aliCfg.endPoint, + AliMNS.Subscription.NotifyStrategy.BACKOFF_RETRY, + AliMNS.Subscription.NotifyContentFormat.SIMPLIFIED) + .then((data)=>{ + // console.info(data); + done(); }, done); + }); + + it('#listP', (done)=>{ + topic.listP().then((data)=>{ + // console.info(data.Subscriptions); + done(); }, done); + }); + + it('Subscription #setAttrsP & #getAttrsP', (done)=>{ + subscription.setAttrsP({ NotifyStrategy: AliMNS.Subscription.NotifyStrategy.EXPONENTIAL_DECAY_RETRY }) + .then((dataSet)=>{ + // console.info(dataSet); + return subscription.getAttrsP(); + }) + .then((dataGet)=>{ + // console.info(dataGet); + assert.equal(dataGet.Subscription.NotifyStrategy, AliMNS.Subscription.NotifyStrategy.EXPONENTIAL_DECAY_RETRY); + }) + .then(()=>{ done(); }, done); + }); + + it('#publishP', (done)=>{ + topic.publishP("Hello") + .then((data)=>{ + // console.info(data); + done(); }, done); + }); + + it('#unsubscribe', (done)=>{ + topic.unsubscribeP(subName) + .then(()=>{ done(); }, done); + }); + + it('#deleteTopicP', (done)=>{ + mns.deleteTopicP(topicName) + .then(()=>{ done(); }, done); + }); + }); + + describe('Topic-Notify', function(){ + this.timeout(1000 * 10); + + var topicName = aliCfg.topicName + Math.floor(Math.random() * 10000); + var subName = topicName + '-sub' + Math.floor(Math.random() * 10000); + var topic = new AliMNS.Topic(topicName, account, aliCfg.region); + var server = null; + var nx = null; + + it('prepare-create-topic', (done)=>{ + mns.createTopicP(topicName) + .then(()=>{ done(); }, done); + }); + + it('prepare-subscribe', (done)=>{ + topic.subscribeP(subName, aliCfg.endPoint, + AliMNS.Subscription.NotifyStrategy.BACKOFF_RETRY, + AliMNS.Subscription.NotifyContentFormat.SIMPLIFIED) + .then(()=>{ done(); }, done); + }); + + it('prepare-http', ()=>{ + nx = new Promise((resolve, reject)=>{ + server = http.createServer((request, response)=>{ + var chunks = []; + request.on('data', (chunk)=>{ + chunks.push(chunk); + }); + request.on('end', ()=>{ + var buf = Buffer.concat(chunks); + + response.writeHead(204, {'Content-Type': 'text/plain'}); + response.end(); + resolve({ + url: request.url, + headers: request.headers, + data: buf.toString() + }); + }); + + + }); + server.listen(aliCfg.port); + }); + }); + + it.skip('wait-notify', (done)=>{ + var tmo = setTimeout(()=>{ + done(new Error("timeout")); + }, 1000*8); + nx.then((data)=>{ + console.info(data); + clearTimeout(tmo); + done(); + }); + topic.publishP("Hello"); + }); + + it('clean', (done)=>{ + server.close(); + mns.deleteTopicP(topicName) + .then(()=>{ done(); }, done); + }); + }); +}); \ No newline at end of file diff --git a/ts/Interfaces.ts b/ts/Interfaces.ts index 4484d5a..7e95e2f 100644 --- a/ts/Interfaces.ts +++ b/ts/Interfaces.ts @@ -10,6 +10,15 @@ module AliMNS{ deleteP(name:string); } + export interface IMNSTopic extends IMNS{ + // List all topics. + listTopicP(prefix?:string, pageSize?:number, pageMarker?:string); + // Create a topic + createTopicP(name:string, options?:any); + // Delete a topic + deleteTopicP(name:string); + } + export interface IMQ{ // 获取MQ的属性值 getAttrsP(); @@ -46,4 +55,30 @@ module AliMNS{ export interface INotifyRecvBatch extends INotifyRecv{ notifyRecv(cb:(ex:Error, msg:any)=>Boolean, waitSeconds?:number, numOfMessages?:number); } + + export interface ITopic{ + // 获取Topic的属性值 + getAttrsP(); + // 设置Topic的属性值 + setAttrsP(options:any); + // List all subscriptions. + listP(prefix?:string, pageSize?:number, pageMarker?:string); + // Subscribe a topic. + subscribeP(name:string, endPoint:string, notifyStrategy?:string, notifyContentFormat?:string); + // Unsubscribe a topic. + unsubscribeP(name:string); + // Publish a message. + publishP(msg:string, b64: boolean); + } + + export interface ISubscription{ + // 获取Subscription的属性值 + getAttrsP(); + // 设置Subscription的属性值 + setAttrsP(options:any); + } + + export interface ITopicNotify{ + notifyP(request:any); + } } \ No newline at end of file diff --git a/ts/MNS.ts b/ts/MNS.ts index 5f2445e..49fa4d7 100644 --- a/ts/MNS.ts +++ b/ts/MNS.ts @@ -49,11 +49,11 @@ module AliMNS{ return Util.format(this._pattern, this._account.getAccountId(), this._region); } - private _account:Account; // Ali account - private _region = "hangzhou"; // region: hangzhou, beijing, qingdao + protected _account:Account; // Ali account + protected _region = "hangzhou"; // region: hangzhou, beijing, qingdao private _pattern = "http://%s.mns.cn-%s.aliyuncs.com/queues/"; private _url:string; // mns url - private _openStack: OpenStack; + protected _openStack: OpenStack; } // For compatible v1.x diff --git a/ts/MNSTopic.ts b/ts/MNSTopic.ts new file mode 100644 index 0000000..8004e9e --- /dev/null +++ b/ts/MNSTopic.ts @@ -0,0 +1,45 @@ +/// + +module AliMNS{ + export class MNSTopic extends MNS implements IMNSTopic{ + public constructor(account:Account, region?:string){ + super(account, region); + // make url + this._urlTopic = this.makeTopicURL(); + } + + // List all topics. + public listTopicP(prefix?:string, pageSize?:number, pageMarker?:string){ + var headers = {}; + if(prefix) headers["x-mns-prefix"] = prefix; + if(pageMarker) headers["x-mns-marker"] = pageMarker; + if(pageSize) headers["x-mns-ret-number"] = pageSize; + var url = this._urlTopic.slice(0, -1); + debug("GET " + url); + return this._openStack.sendP("GET", url, null, headers); + } + + // Create a topic + public createTopicP(name:string, options?:any){ + var body = { Topic: "" }; + if(options) body.Topic = options; + var url = Url.resolve(this._urlTopic, name); + debug("PUT " + url, body); + return this._openStack.sendP("PUT", url, body); + } + + // Delete a topic + public deleteTopicP(name:string){ + var url = Url.resolve(this._urlTopic, name); + debug("DELETE " + url); + return this._openStack.sendP("DELETE", url); + } + + private makeTopicURL(){ + return Util.format(this._patternTopic, this._account.getAccountId(), this._region); + } + + private _patternTopic = "http://%s.mns.cn-%s.aliyuncs.com/topics/"; + private _urlTopic:String; + } +} \ No newline at end of file diff --git a/ts/OpenStack.ts b/ts/OpenStack.ts index ec931e1..ebe0b5d 100644 --- a/ts/OpenStack.ts +++ b/ts/OpenStack.ts @@ -71,7 +71,7 @@ module AliMNS{ headers["Content-MD5"] = contentMD5; } - // `Dat`e & `Host` will be added by request automatically + // `Date` & `Host` will be added by request automatically if(!headers["x-mns-version"]) headers["x-mns-version"] = this._version; // lowercase & sort & extract the x-mns- diff --git a/ts/Subscription.ts b/ts/Subscription.ts new file mode 100644 index 0000000..23df126 --- /dev/null +++ b/ts/Subscription.ts @@ -0,0 +1,57 @@ +/// +/// +/// + +module AliMNS{ + // The Subscription + export class Subscription implements ISubscription{ + // The constructor. name & topic is required. + constructor(name:string, topic:Topic){ + this._name = name; + this._topic = topic; + + // make url + this._urlAttr = this.makeAttrURL(); + + // create the OpenStack object + this._openStack = new OpenStack(topic.getAccount()); + } + + public getName(){ return this._name; } + public getTopic(){ return this._topic; } + + // 获取Subscription的属性值 + public getAttrsP(){ + debug("GET " + this._urlAttr); + return this._openStack.sendP("GET", this._urlAttr); + } + + // 设置Subscription的属性值 + public setAttrsP(options:any){ + var body = { Subscription: options }; + debug("PUT " + this._urlAttr, body); + return this._openStack.sendP("PUT", this._urlAttr + "?metaoverride=true", body); + } + + public static NotifyStrategy = { + BACKOFF_RETRY : "BACKOFF_RETRY", + EXPONENTIAL_DECAY_RETRY : "EXPONENTIAL_DECAY_RETRY" + }; + + public static NotifyContentFormat = { + XML : "XML", + SIMPLIFIED : "SIMPLIFIED" + }; + + private makeAttrURL(){ + return Util.format(this._pattern, this._topic.getAccount().getAccountId(), this._topic.getRegion(), this._topic.getName(), this._name); + } + + protected _openStack: OpenStack; + + private _name: string; + private _topic: Topic; + private _urlAttr: string; // Subscription attr url + private _pattern = "http://%s.mns.cn-%s.aliyuncs.com/topics/%s/subscriptions/%s"; + } +} \ No newline at end of file diff --git a/ts/Topic.ts b/ts/Topic.ts new file mode 100644 index 0000000..c1f111a --- /dev/null +++ b/ts/Topic.ts @@ -0,0 +1,108 @@ +/// +/// +/// + +module AliMNS{ + // The Topic + export class Topic implements ITopic{ + // The constructor. name & account is required. + // region can be "hangzhou", "beijing" or "qingdao", the default is "hangzhou" + constructor(name:string, account:Account, region?:string){ + this._name = name; + this._account = account; + if(region) this._region = region; + + // make url + this._urlAttr = this.makeAttrURL(); + this._urlSubscription = this.makeSubscriptionURL(); + this._urlPublish = this.makePublishURL(); + + // create the OpenStack object + this._openStack = new OpenStack(account); + } + + public getName(){ return this._name; } + public getAccount(){ return this._account; } + public getRegion(){ return this._region; } + + // 获取Topic的属性值 + public getAttrsP(){ + debug("GET " + this._urlAttr); + return this._openStack.sendP("GET", this._urlAttr); + } + + // 设置Topic的属性值 + public setAttrsP(options:any){ + var body = { Topic: options }; + debug("PUT " + this._urlAttr, body); + return this._openStack.sendP("PUT", this._urlAttr + "?metaoverride=true", body); + } + + // List all subscriptions. + public listP(prefix?:string, pageSize?:number, pageMarker?:string){ + var headers = {}; + if(prefix) headers["x-mns-prefix"] = prefix; + if(pageMarker) headers["x-mns-marker"] = pageMarker; + if(pageSize) headers["x-mns-ret-number"] = pageSize; + var url = this._urlSubscription.slice(0, -1); + debug("GET " + url); + return this._openStack.sendP("GET", url, null, headers); + } + + public subscribeP(name:string, endPoint:string, notifyStrategy?:string, notifyContentFormat?:string){ + var body = { + Subscription: { + Endpoint: endPoint + } + }; + if(notifyStrategy) body.Subscription['NotifyStrategy'] = notifyStrategy; + if(notifyContentFormat) body.Subscription['NotifyContentFormat'] = notifyContentFormat; + var url = Url.resolve(this._urlSubscription, name); + debug("PUT " + url, body); + return this._openStack.sendP("PUT", url, body); + } + + public unsubscribeP(name:string){ + var url = Url.resolve(this._urlSubscription, name); + debug("DELETE " + url); + return this._openStack.sendP("DELETE", url); + } + + public publishP(msg:string, b64:boolean){ + var body = { + Message: { + MessageBody: b64?this.utf8ToBase64(msg):msg + } + }; + debug("POST " + this._urlPublish, body); + return this._openStack.sendP("POST", this._urlPublish, body); + } + + protected utf8ToBase64(src){ + var buf = new Buffer.Buffer(src, 'utf8'); + return buf.toString('base64'); + } + + private makeAttrURL(){ + return Util.format(this._pattern, this._account.getAccountId(), this._region, this._name); + } + + private makeSubscriptionURL(){ + return this.makeAttrURL() + "/subscriptions/"; + } + + private makePublishURL(){ + return this.makeAttrURL() + "/messages"; + } + + private _urlSubscription:string; // topic subscription url + private _urlPublish:string; // publish message url + protected _openStack: OpenStack; + + private _name: string; + private _region = "hangzhou"; + private _account: Account; + private _urlAttr: string; // topic attr url + private _pattern = "http://%s.mns.cn-%s.aliyuncs.com/topics/%s"; + } +} \ No newline at end of file diff --git a/ts/tsconfig.json b/ts/tsconfig.json new file mode 100644 index 0000000..620eb7f --- /dev/null +++ b/ts/tsconfig.json @@ -0,0 +1,15 @@ +{ + "compilerOptions": { + "module": "commonjs", + "noImplicitAny": false, + "removeComments": false, + "preserveConstEnums": true, + "outFile": "index.js", + "sourceMap": true, + "rootDir": "ts/" + }, + "exclude": [ + "node_modules", + "dts" + ] +} \ No newline at end of file