Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

initial commit (wip)

  • Loading branch information...
commit ba5e9dbf6f01e52d43f6ee87c4af968a8e668060 0 parents
@dannycoates authored
1  .gitignore
@@ -0,0 +1 @@
+node_modules
47 client.js
@@ -0,0 +1,47 @@
+module.exports = function (
+ net,
+ inherits,
+ EventEmitter,
+ ReadableStream,
+ Receiver,
+ FetchRequest,
+ ProduceRequest) {
+
+ function Client(options) {
+ var self = this
+ this.connection = net.connect(options)
+ this.connection.on('connect',
+ function () {
+ self.emit('connect')
+ }
+ )
+ this.connection.on('end',
+ function () {
+ self.emit('end')
+ self.connection = null
+ }
+ )
+ this.readableSteam = new ReadableStream()
+ this.readableSteam.wrap(this.connection)
+ this.receiver = new Receiver(this.readableSteam)
+ }
+ inherits(Client, EventEmitter)
+
+ Client.prototype.fetch = function (offset, maxSize, cb) {
+ var request = new FetchRequest()
+ request.offset = offset
+ request.maxSize = maxSize
+ //TODO something with these return values
+ request.serialize(this.connection)
+ this.receiver.push(request, cb)
+ }
+
+ Client.prototype.produce = function (topic, messages) {
+ var request = new ProduceRequest()
+ request.messages = messages
+ //TODO topic
+ request.serialize(this.connection)
+ }
+
+ return Client
+}
15 index.js
@@ -0,0 +1,15 @@
+var net = require('net')
+var inherits = require('util').inherits
+var EventEmitter = require('events').EventEmitter
+var protocol = require('./protocol')
+var ReadableStream = require('readable-stream')
+var Client = require('./client')(
+ net,
+ inherits,
+ EventEmitter,
+ ReadableStream,
+ protocol.Receiver,
+ protocol.FetchRequest,
+ protocol.ProduceRequest)
+
+module.exports = Client
24 package.json
@@ -0,0 +1,24 @@
+{
+ "name": "franz-kafka",
+ "version": "0.7.0",
+ "description": "Kafka Client http://incubator.apache.org/kafka/",
+ "main": "index.js",
+ "dependencies" : {
+ "readable-stream": "*",
+ "snappy": "*",
+ "buffer-crc32": "*"
+ },
+ "scripts": {
+ "test": "echo \"Error: no test specified\" && exit 1"
+ },
+ "repository": {
+ "type": "git",
+ "url": "git://github.com/dannycoates/franz-kafka.git"
+ },
+ "keywords": [
+ "kafka",
+ "awesome"
+ ],
+ "author": "Danny Coates",
+ "license": "BSD"
+}
28 protocol/fetch-body.js
@@ -0,0 +1,28 @@
+module.exports = function (
+ inherits,
+ State,
+ Message) {
+
+ function FetchBody(bytes) {
+ State.call(this, bytes)
+ }
+ inherits(FetchBody, State)
+
+ FetchBody.prototype.parse = function () {
+ console.assert(this.complete())
+ var messages = []
+ var offset = 0
+ while (offset < this.buffer.length) {
+ var len = this.buffer.readUInt32BE(offset)
+ messages.push(Message.parse(this.buffer.slice(offset, offset + len + 4)))
+ offset += (len + 4)
+ }
+ return messages
+ }
+
+ FetchBody.prototype.next = function () {
+ return this.parse()
+ }
+
+ return FetchBody
+}
21 protocol/fetch-request.js
@@ -0,0 +1,21 @@
+module.exports = function (
+ RequestHeader) {
+
+ function FetchRequest() {
+ this.header = null
+ this.offset = []
+ this.maxSize = 0
+ }
+
+ FetchRequest.prototype.serialize = function (stream) {
+ var payload = new Buffer(12)
+ payload.writeUInt32BE(this.offset[0], 0)
+ payload.writeUInt32BE(this.offset[1], 4)
+ payload.writeUInt32BE(this.maxSize, 8)
+ this.header = new RequestHeader(payload.length, 1, "test")
+ this.header.serialize(stream)
+ return stream.write(payload)
+ }
+
+ return FetchRequest
+}
23 protocol/index.js
@@ -0,0 +1,23 @@
+var inherits = require('util').inherits
+var zlib = require('zlib')
+var snappy = require('snappy')
+var crc32 = require('buffer-crc32')
+var NullState = require('./nullstate')()
+var State = require('./state')()
+var Message = require('./message')(zlib, snappy, crc32)
+var RequestHeader = require('./request-header')()
+var ResponseHeader = require('./response-header')(inherits, State)
+var FetchBody = require('./fetch-body')(inherits, State, Message)
+var OffsetsBody = require('./offsets-body')(inherits, State)
+var Response = require('./response')(ResponseHeader, FetchBody, OffsetsBody)
+var Receiver = require('./receiver')(NullState, Response)
+var FetchRequest = require('./fetch-request')(RequestHeader)
+var OffsetsRequest = require('./offsets-request')(RequestHeader)
+var ProduceRequest = require('./produce-request')(RequestHeader)
+
+module.exports = {
+ Receiver: Receiver,
+ FetchRequest: FetchRequest,
+ ProduceRequest: ProduceRequest,
+ Message: Message
+}
109 protocol/message.js
@@ -0,0 +1,109 @@
+module.exports = function (zlib, snappy, crc32) {
+
+ function Message() {
+ this.magic = 0
+ this.compression = 0
+ this.checksum = 0
+ this.payload = null
+ }
+
+ Message.compression = {
+ NONE: 0,
+ GZIP: 1,
+ SNAPPY: 2
+ }
+
+ Message.prototype.length = function () {
+ return this.payload.length + 6
+ }
+
+ Message.prototype.toBuffer = function () {
+ var buffer = new Buffer(this.payload.length + 10)
+ buffer.writeUInt32BE(this.payload.length + 6, 0)
+ buffer.writeUInt8(this.magic, 4)
+ buffer.writeUInt8(this.compression, 5)
+ buffer.writeUInt32BE(this.checksum, 6)
+ this.payload.copy(buffer, 10)
+ return buffer
+ }
+
+ Message.prototype.getData = function (cb) {
+ if (this.magic && this.compression) {
+ var self = this
+ function uncompressed(err, data) {
+ if (err) {
+ return cb(err)
+ }
+ self.payload = data
+ self.compression = Message.compression.NONE
+ cb(null, data)
+ }
+ if (this.compression === Message.compression.GZIP) {
+ zlib.gunzip(this.payload, uncompressed)
+ }
+ else if (this.compression === Message.compression.SNAPPY) {
+ snappy.uncompress(this.payload, uncompressed)
+ }
+ else {
+ cb(new Error("Unknown compression " + this.compression))
+ }
+ }
+ else {
+ cb(null, this.payload)
+ }
+ }
+
+ function compress(buffer, method, cb) {
+ switch (method) {
+ case Message.compression.GZIP:
+ zlib.gzip(buffer, cb)
+ break;
+ case Message.compression.SNAPPY:
+ snappy.compress(buffer, cb)
+ break;
+ default:
+ cb(null, buffer)
+ }
+ }
+
+ Message.prototype.setData = function (buffer, compression, cb) {
+ var self = this
+ if (compression) {
+ this.magic = 1
+ this.compression = compression
+ compress(buffer, compression,
+ function (err, compressed) {
+ if (err) {
+ return cb(err)
+ }
+ self.payload = compressed
+ self.checksum = crc32.unsigned(compressed)
+ return cb(null, self)
+ }
+ )
+ }
+ else {
+ this.magic = 0
+ this.compression = 0
+ this.payload = buffer
+ this.checksum = crc32.unsigned(buffer)
+ cb(null, this)
+ }
+ }
+
+ Message.parse = function (buffer) {
+ var m = new Message()
+ var payload = new Buffer(buffer.length - 10)
+ buffer.copy(payload, 0, 10)
+ m.magic = buffer.readUInt8(4)
+ m.compression = buffer.readUInt8(5)
+ m.checksum = buffer.readUInt32BE(6)
+ m.payload = payload // TODO maybe slice to save a copy
+ if (crc32.unsigned(payload) !== m.checksum) {
+ console.error("Mismatched checksum")
+ }
+ return m
+ }
+
+ return Message
+}
6 protocol/nullstate.js
@@ -0,0 +1,6 @@
+module.exports = function () {
+ function NullState() {}
+ NullState.prototype.read = function () { return false }
+ NullState.prototype.complete = function () { return true }
+ return NullState
+}
27 protocol/offsets-body.js
@@ -0,0 +1,27 @@
+module.exports = function (
+ inherits,
+ State) {
+
+ function OffsetsBody(bytes) {
+ State.call(this, bytes)
+ }
+ inherits(OffsetsBody, State)
+
+ OffsetsBody.prototype.parse = function () {
+ console.assert(this.complete())
+ var offsets = []
+ var count = this.buffer.readUInt32BE(0)
+ for (var i = 0; i < count; i++) {
+ var high = 4 + (i * 8)
+ var low = 4 + (i * 8) + 4
+ offsets.push([this.buffer.readUInt32BE(high),this.buffer.readUInt32BE(low)])
+ }
+ return offsets
+ }
+
+ OffsetsBody.prototype.next = function () {
+ return this.parse()
+ }
+
+ return OffsetsBody
+}
11 protocol/offsets-request.js
@@ -0,0 +1,11 @@
+module.exports = function (
+ RequestHeader) {
+
+ function OffsetsRequest() {
+ this.header = new RequestHeader()
+ this.time = []
+ this.maxOffsets = 0
+ }
+
+ return OffsetsRequest
+}
24 protocol/produce-request.js
@@ -0,0 +1,24 @@
+module.exports = function (
+ RequestHeader) {
+
+ function ProduceRequest() {
+ this.header = null
+ this.messages = []
+ }
+
+ ProduceRequest.prototype.serialize = function (stream) {
+ var messageBuffers = this.messages.map(function (m) { return m.toBuffer() })
+ var messagesLength = messageBuffers.reduce(function (t, b) { return t + b.length }, 0)
+ this.header = new RequestHeader(messagesLength + 4, 0, 'test')
+ console.log("mlen " + messagesLength)
+ console.log(this.header.header.toString('hex'))
+ this.header.serialize(stream)
+ var mlen = new Buffer(4)
+ mlen.writeUInt32BE(messagesLength, 0)
+ console.log(mlen.toString('hex'))
+ stream.write(mlen)
+ messageBuffers.forEach(function (b) { stream.write(b) })
+ }
+
+ return ProduceRequest
+}
45 protocol/receiver.js
@@ -0,0 +1,45 @@
+module.exports = function (
+ NullState,
+ Response) {
+
+ var nullState = new NullState()
+
+ function Receiver(stream) {
+ var self = this
+ this.stream = stream
+ this.stream.on('readable',
+ function () {
+ self.read()
+ }
+ )
+ this.stream.on('end',
+ function () {
+ self.closed = true
+ }
+ )
+ this.queue = []
+ this.current = nullState
+ this.closed = false
+ }
+
+ Receiver.prototype.next = function () {
+ this.current = this.queue.shift() || nullState
+ }
+
+ Receiver.prototype.read = function () {
+ if (this.current.complete()) {
+ this.next()
+ }
+ while (this.current.read(this.stream)) {
+ this.next()
+ }
+ }
+
+ Receiver.prototype.push = function (request, cb) {
+ if (this.closed) { return false } // or something
+ this.queue.push(new Response(request, cb))
+ return true
+ }
+
+ return Receiver
+}
24 protocol/request-header.js
@@ -0,0 +1,24 @@
+module.exports = function () {
+
+ function RequestHeader(payloadLength, type, topic, partition) {
+ type = type || 0
+ topic = topic || ""
+ partition = partition || 0
+ payloadLength = payloadLength || 0
+ var topicLength = Buffer.byteLength(topic)
+ var length = payloadLength + topicLength + 8
+ this.header = new Buffer(topicLength + 12)
+ this.header.writeUInt32BE(length, 0)
+ this.header.writeUInt16BE(type, 4)
+ this.header.writeUInt16BE(topicLength, 6)
+ this.header.write(topic, 8)
+ this.header.writeUInt32BE(partition, this.header.length - 4)
+ this.type = type
+ }
+
+ RequestHeader.prototype.serialize = function (stream) {
+ return stream.write(this.header)
+ }
+
+ return RequestHeader
+}
25 protocol/response-header.js
@@ -0,0 +1,25 @@
+module.exports = function (
+ inherits,
+ State) {
+
+ function ResponseHeader(ResponseBody) {
+ this.length = 0
+ this.errno = 0
+ this.ResponseBody = ResponseBody
+ State.call(this, 6)
+ }
+ inherits(ResponseHeader, State)
+
+ ResponseHeader.prototype.parse = function () {
+ console.assert(this.complete())
+ this.length = this.buffer.readUInt32BE(0)
+ this.errno = this.buffer.readUInt16BE(4)
+ }
+
+ ResponseHeader.prototype.next = function () {
+ this.parse()
+ return new this.ResponseBody(this.length - 2)
+ }
+
+ return ResponseHeader
+}
41 protocol/response.js
@@ -0,0 +1,41 @@
+module.exports = function (
+ ResponseHeader,
+ FetchBody,
+ OffsetsBody) {
+
+ function Response(request, cb) {
+ switch (request.header.type) {
+ case 1:
+ this.state = new ResponseHeader(FetchBody)
+ break;
+ case 4:
+ this.state = new ResponseHeader(OffsetsBody)
+ break;
+ }
+ this.cb = cb
+ this.request = request
+ this.complete = false
+ }
+
+ Response.prototype.complete = function () {
+ return complete
+ }
+
+ Response.prototype.read = function (stream) {
+ while (this.state.read(stream)) {
+ var result = this.state.next()
+ if (Array.isArray(result)) { // TODO: better
+ // something something
+ this.cb(result)
+ this.complete = true
+ break;
+ }
+ else {
+ this.state = result
+ }
+ }
+ return this.complete
+ }
+
+ return Response
+}
29 protocol/state.js
@@ -0,0 +1,29 @@
+module.exports = function () {
+
+ function State(bytes) {
+ this.remainingBytes = bytes
+ this.buffer = new Buffer(bytes)
+ }
+
+ State.prototype.complete = function () {
+ return this.remainingBytes === 0
+ }
+
+ State.prototype.read = function (stream) {
+ if (this.complete()) { return true }
+ var data = stream.read(this.remainingBytes)
+ if (!data) { return false }
+ console.assert(data.length <= this.remainingBytes)
+ data.copy(this.buffer, this.buffer.length - this.remainingBytes)
+ this.remainingBytes = this.remainingBytes - data.length
+ return this.complete()
+ }
+
+ State.prototype.next = function () { return this }
+
+ State.prototype.toString = function () {
+ return "State " + this.complete() + " " + this.buffer.toString('hex')
+ }
+
+ return State
+}
Please sign in to comment.
Something went wrong with that request. Please try again.