This repository has been archived by the owner on Jan 26, 2018. It is now read-only.
/
offsets-request.js
57 lines (52 loc) · 1.72 KB
/
offsets-request.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
module.exports = function (
RequestHeader,
Response,
OffsetsBody,
int53) {
function OffsetsRequest(topic, partition, time, maxCount) {
this.topic = topic
this.partition = partition
this.time = time
this.maxCount = maxCount
}
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// / REQUEST HEADER /
// / /
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | TIME |
// | |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | MAX_NUMBER (of OFFSETS) |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
//
// TIME = int64 // Milliseconds since UNIX Epoch.
// // -1 = LATEST
// // -2 = EARLIEST
// MAX_NUMBER = int32 // Return up to this many offsets
OffsetsRequest.prototype.serialize =function (stream, cb) {
var err = null
var payload = new Buffer(12)
int53.writeInt64BE(this.time, payload)
payload.writeUInt32BE(this.maxCount, 8)
var header = new RequestHeader(
payload.length,
RequestHeader.types.OFFSETS,
this.topic.name,
this.partition.id
)
try {
header.serialize(stream)
var written = stream.write(payload)
}
catch (e) {
err = e
}
cb(err, written)
}
OffsetsRequest.prototype.response = function (cb) {
return new Response(OffsetsBody, cb)
}
return OffsetsRequest
}