Skip to content

abusix/knsq

Repository files navigation

knsq

Maven Central GitHub issues JDK Version Kotlin Version GitHub

A NSQ client library written in Kotlin, some parts based on nsqj.

Features

  • Subscribers with nsqlookupd support
  • Publishers
  • HTTP API wrapper for nsqd and nsqlookupd
  • Subscriber backoff
  • TLS support
  • Snappy and DEFLATE compression
  • Sampling
  • Authorization
  • Full error and exception control
  • Scalable

Maven and gradle configuration

Add the dependency using Maven

<dependency>
    <groupId>com.abusix</groupId>
    <artifactId>knsq</artifactId>
    <version>1.4.0</version>
</dependency>

or Gradle

dependencies {
  compile 'com.abusix:knsq:1.4.0'
}

Getting started

Publish data

val pub = Publisher("nsqd.example.org:4150")

pub.onException = {
    // handle async exceptions here
    println(it)
}

// sync publish
pub.publish("sample.topic", "Hello!".toByteArray())

// async buffered publish for better performance and throughput
pub.publishBuffered("sample.other", "Hi, I am async!".toByteArray())

pub.stop()

Publishers are low-level, non self-recovering instances. Once a connection is broken or closed, they will not reconnect automatically. Calls to Publisher.connect() will not succeed afterwards. If a connection has to be re-established, a new instance of Publisher has to be created.

Publishers can be configured the following ways:

  • Constructor parameters (see below)
  • Individual Batcher configurations, accessible via Publisher.getBatcher(topic)

Subscribe

val subscriber = Subscriber("nsqlookupd.example.org:4161")

val subscription = subscriber.subscribe("sample.topic", "channel", { msg ->
    println(msg)
}, onFailedMessage = { msg ->
    println("Message failed too many times: $msg")
}, onException = { e ->
    // exception happened on client-side
    println(e)
}, onNSQError = { e ->
    // nsq returned unexpected error
    println(e)
})

Subscribers are high-level, self-recovering instances. Broken connections will be re-established and new nsqd instances will be discovered via nsqlookupd dynamically.

You can control the message flow manually using several configuration options on the Subscription object returned by subscriber.subscribe. Further configuration is available on the Subscriber object and its constructor. Messages can also be controlled directly using methods on the message object:

val subscription = subscriber.subscribe("sample.topic", "channel", onMessage = { msg ->
    msg.requeue() // requeue the message, with optional delay
    msg.finish() // tell nsqd that you received the message
    msg.touch() // reset server-side timeouts
})

Directly subscribe to nsqd

As an alternative to nsqlookupd, it is also possible to directly connect to a nsqd instance using DirectSubscriber. Those objects are self-recovering from errors as well and support all operations of normal Subscriber objects.

val subscriber = DirectSubscriber("nsqd.example.org:4150")

val subscription = subscriber.subscribe("sample.topic", "channel", onMessage = { msg ->
    println(msg)
})

Customizing connection parameters

Publisher, Subscriber and DirectSubscriber can be customized by providing ClientConfig objects via the constructor. Most of the parameters in this config are immutable and must be created with the object before creating a Publisher, Subscriber or DirectSubscriber.

val clientConfig = ClientConfig(
    clientId = "sampleId",
    authSecret = "IAMASECRET".toByteArray(),
    tls = false
)

val pub = Publisher("nsqd.example.org:4150", clientConfig = clientConfig)

For the full set of options in ClientConfig and their default values, please see the Dokka or JavaDoc documentation below.

Further Documentation

License

This library is licensed under the MIT License.