Skip to content

Commit

Permalink
improve subscriber, update readme.
Browse files Browse the repository at this point in the history
  • Loading branch information
robin committed Aug 1, 2013
1 parent 98626d6 commit 4550898
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 32 deletions.
9 changes: 6 additions & 3 deletions README.md
Expand Up @@ -4,13 +4,16 @@ Example Usage
-----------------

```scala
val nsq = NsqClient.create("MindtalkClient", "MindtalkClientApp",
"localhost:4161", "mindtalk")
val nsq = NsqSubscriber("Mindtalk", "MindtalkApp",
"127.0.0.1:4161", "mindtalk", "nsqie")

nsq.subscribe("mindtalk", "nsqie"){ case (topic, channel, msg) =>
nsq.listen { case (topic, channel, msg) =>
println("got data %s from topic %s in channel %s".format(msg, topic, channel))
MessageHandleReturn.SUCCESS
}

NsqClient.publish("127.0.0.1:4151", "mindtalk",
"hello " + System.currentTimeMillis() + " :P")
```

Under heavy development, don't use this in production.
Expand Down
1 change: 0 additions & 1 deletion src/main/scala/com/ansvia/nsqie/NSQFrameDecoder.scala
Expand Up @@ -93,7 +93,6 @@ class NSQFrameDecoder extends FrameDecoder with Slf4jLogger {
}

private def sendNOP(implicit channel:Channel){
println("send NOP")
write(Nop)
}

Expand Down
34 changes: 6 additions & 28 deletions src/main/scala/com/ansvia/nsqie/Nsqie.scala
Expand Up @@ -45,6 +45,7 @@ case class NsqSubscriber(name:String, nameLong:String,
val client = NsqClient(producerHost, name, nameLong, rdyCount)
client.listeners :+= new RestartListener {
override def apply(nsq: NsqClient) {
debug("listener called by " + nsq)
// client.listeners.clear()
restart()
listen(callback)
Expand Down Expand Up @@ -85,9 +86,9 @@ case class NsqSubscriber(name:String, nameLong:String,
val resp:HttpResponse = Await.result(
httpClient(RequestBuilder().url("http://" + lookupHost + "/lookup?topic=" + topic).buildGet()))

debug("nsqd resp: " + resp)
// debug("nsqd resp: " + resp)
val content = resp.getContent.toString(CharsetUtil.UTF_8)
debug("content: " + content)
// debug("content: " + content)
val json = parse(content)
var producers = ArrayBuffer.empty[String]
for {
Expand Down Expand Up @@ -188,13 +189,14 @@ case class NsqClient(hostNPort:String, shortId:String, longId:String, rdyCount:I
def identify(data:String) = {
val bf = ByteBuffer.allocate(13 + 4 + data.getBytes.length).order(ByteOrder.BIG_ENDIAN)

bf.put(" V2IDENTIFY\n".getBytes)
bf.put(" V2".getBytes)
bf.put("IDENTIFY\n".getBytes)
bf.putInt(data.length)
bf.put(data.getBytes)

val payload = new String(bf.array())
debug("payload: " + payload)
Await.result(client(payload)) match {
Await.result(client(payload), 15.seconds) match {
case OK =>
connected = true
debug("connected: " + connected)
Expand All @@ -216,24 +218,6 @@ case class NsqClient(hostNPort:String, shortId:String, longId:String, rdyCount:I
client(payload)
}

// def reset(){
// connected = false
// inited = false
// client.close()
// client = buildClient()
//
// client(" V2").onSuccess { data => data match {
// case OK =>
// connected = true
// debug("connected: " + connected)
// if (retrier != null){
// retrier.run()
// }
// case x =>
// error("cannot identify, returned from nsqd: " + x)
// }
// }
// }

private def identifyInternal(){
identify("""{"short_id":"%s","long_id":"%s"}""".format(shortId, longId))
Expand Down Expand Up @@ -292,12 +276,6 @@ case class NsqClient(hostNPort:String, shortId:String, longId:String, rdyCount:I
def nop()(implicit ctx:SubscribeContext){
_dispatch(NOP + "\n")
.onSuccess(feed)
.onFailure {
case e =>
error(e.getMessage)
inited = false
ensureInit()
}
}

private def buildClient() = {
Expand Down

0 comments on commit 4550898

Please sign in to comment.