This repository has been archived by the owner on Jan 26, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 16
/
client.js
154 lines (135 loc) · 3.61 KB
/
client.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
module.exports = function (
logger,
net,
inherits,
EventEmitter,
ReadableStream,
Message,
Receiver,
FetchRequest,
ProduceRequest,
OffsetsRequest
) {
function Client(id, options) {
this.id = id
this.ready = false
this.readableSteam = null
this.receiver = null
this.connection = net.connect(options)
this.onConnectionConnect = connectionConnect.bind(this)
this.onConnectionEnd = connectionEnd.bind(this)
this.onConnectionDrain = connectionDrain.bind(this)
this.onConnectionError = connectionError.bind(this)
this.onConnectionClose = connectionClose.bind(this)
this.connection.on('connect', this.onConnectionConnect)
this.connection.on('end', this.onConnectionEnd)
this.connection.on('drain', this.onConnectionDrain)
this.connection.on('error', this.onConnectionError)
this.connection.on('close', this.onConnectionClose)
EventEmitter.call(this)
}
inherits(Client, EventEmitter)
Client.prototype.drain = function (cb) {
logger.info('draining', this.id)
this.receiver.close(
function () {
logger.info('drained', this.id)
// XXX is reopening correct here?
this.receiver.open()
cb()
}.bind(this)
)
}
Client.prototype.end = function () {
this.connection.end()
}
Client.prototype._send = function (request, cb) {
request.serialize(
this.connection,
afterSend.bind(this, request, cb)
)
return this.ready
}
function afterSend(request, cb, err, written) {
if (err) {
this.ready = false
return cb(err)
}
if (!written) {
logger.info('connection', 'wait')
this.ready = false
}
this.receiver.push(request, cb)
}
// cb: function (err, length, messages) {}
Client.prototype.fetch = function (topic, partition, cb) {
logger.info(
'fetching', topic.name,
'broker', this.id,
'partition', partition.id
)
return this._send(new FetchRequest(topic, partition), cb)
}
// topic: a Topic object
// messages: array of: string, Buffer, Message
// partition: number
Client.prototype.write = function (topic, messages, partitionId, cb) {
logger.info(
'publishing', topic.name,
'messages', messages.length,
'broker', this.id,
'partition', partitionId
)
return this._send(
new ProduceRequest(
topic,
partitionId,
messages.map(Message.create)
),
cb
)
}
Client.prototype.offsets = function (topic, partition, time, maxCount, cb) {
logger.info(
'offsets', time,
'topic', topic.name,
'broker', this.id,
'partition', partition.id
)
return this._send(new OffsetsRequest(topic, partition, time, maxCount), cb)
}
Client.compression = Message.compression
Client.nil = { ready: false }
function connectionConnect() {
logger.info('client connect')
this.readableSteam = new ReadableStream()
this.readableSteam.wrap(this.connection)
this.receiver = new Receiver(this.readableSteam)
this.ready = true
this.emit('connect')
}
function connectionEnd() {
logger.info('client end')
this.ready = false
this.emit('end')
}
function connectionDrain() {
if (!this.ready) { //TODO: why is connection.drain so frequent?
this.ready = true
this.emit('ready')
}
}
function connectionError(err) {
logger.info('client error', err.message)
}
function connectionClose(hadError) {
logger.info('client closed. with error', hadError)
this.connection.removeListener('connect', this.onConnectionConnect)
this.connection.removeListener('end', this.onConnectionEnd)
this.connection.removeListener('drain', this.onConnectionDrain)
this.connection.removeListener('error', this.onConnectionError)
this.connection.removeListener('close', this.onConnectionClose)
this.emit('end')
}
return Client
}