-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.spec.ts
72 lines (63 loc) · 1.83 KB
/
index.spec.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import { expect } from 'chai';
import { timer } from '@biorate/tools';
import { Message } from 'node-rdkafka';
import { root } from './__mocks__';
describe('@biorate/rdkafka', function () {
const topic = 'test';
const timeout = 5000;
this.timeout(Infinity);
async function cleanup() {
try {
await root.admin!.current!.deleteTopic(topic, timeout / 2);
} catch {}
}
before(async () => {
await root.$run();
await cleanup();
});
after(async () => {
await cleanup();
process.exit();
});
it('AdminClient #createTopic', async () => {
await root.admin!.current!.createTopic(
{
topic,
num_partitions: 1,
replication_factor: 1,
},
timeout / 2,
);
});
it('AdminClient #createPartitions', async () =>
await root.admin!.current!.createPartitions(topic, 3, timeout / 2));
it('produce / consume', async () => {
root.producer!.current!.produce(topic, null, Buffer.from('hello world!'));
root.consumer!.current!.subscribe([topic]);
while (true) {
await timer.wait();
const messages = await root.consumer!.current!.consumePromise(1);
if (!messages.length) continue;
root.consumer!.current!.commitMessageSync(messages[0]);
root.consumer!.current!.unsubscribe();
break;
}
});
it('produce / consume stream', (done) => {
root.producer!.current!.produce(topic, null, Buffer.from('hello world!'));
root.consumerStream!.current!.subscribe(async (message: Message | Message[]) => {
await root.consumerStream!.current!.unsubscribe();
done();
});
});
it('high level producer', async () => {
const res = await root.highLevelProducer!.current!.producePromise(
topic,
null,
Buffer.from('hello world!'),
null,
Date.now(),
);
expect(res).to.be.a('number');
});
});