Skip to content
This repository was archived by the owner on Dec 2, 2024. It is now read-only.

Commit 64bd1dd

Browse files
committed
use multileveldown to get seamless failover
1 parent 56edf45 commit 64bd1dd

File tree

2 files changed

+87
-132
lines changed

2 files changed

+87
-132
lines changed

index.js

Lines changed: 84 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -1,137 +1,93 @@
11
var level = require('level');
2-
var multilevel = require('multilevel');
3-
var levelProxy = require('level-proxy');
4-
var net = require('net');
2+
var has = require('has');
3+
var pump = require('pump');
54
var fs = require('fs');
5+
var net = require('net');
66
var path = require('path');
7-
var has = require('has');
8-
var manifest = require('level-manifest')({
9-
methods: {
10-
_iteratorCreate: { type: 'async' },
11-
_iteratorNext: { type: 'async' },
12-
_iteratorEnd: { type: 'async' }
13-
}
14-
});
7+
var multileveldown = require('multileveldown');
158

169
module.exports = function (dir, opts) {
17-
var proxy = levelProxy();
18-
withProxy(proxy, dir, opts);
19-
return proxy;
20-
};
10+
if (!opts) opts = {};
11+
if (!has(opts, 'retry')) opts.retry = true;
2112

22-
function withProxy (proxy, dir, opts) {
23-
var db = level(dir, opts);
24-
var sockfile = path.join(dir, 'level-party.sock');
25-
26-
db.once('error', onerror);
27-
db.once('open', onopen);
28-
29-
function onerror (err) {
30-
db.removeListener('open', onopen);
31-
if (err.type === 'OpenError') createStream();
32-
}
33-
34-
function onopen (times) {
35-
db.removeListener('error', onerror);
36-
37-
var server = net.createServer(function (stream) {
38-
var iterators = {};
39-
if (!db.methods) db.methods = {};
40-
41-
db.methods._iteratorCreate = { type: 'async' };
42-
db._iteratorCreate = function (ix, opts) {
43-
iterators[ix] = (db.iterator && db.iterator(opts))
44-
|| (db.db && db.db.iterator && db.db.iterator(opts))
45-
;
46-
};
47-
48-
db.methods._iteratorNext = { type: 'async' };
49-
db._iteratorNext = function (ix, cb) {
50-
if (!has(iterators, ix)) cb(new Error('no such iterator'))
51-
else iterators[ix].next(cb);
52-
};
53-
54-
db.methods._iteratorEnd = { type: 'async' };
55-
db._iteratorEnd = function (ix, cb) {
56-
if (!has(iterators, ix)) cb(new Error('no such iterator'))
57-
else iterators[ix].end(cb)
58-
};
59-
60-
stream.on('error', function (err) { cleanup() });
61-
stream.once('end', cleanup);
62-
stream.pipe(multilevel.server(db)).pipe(stream);
63-
64-
function cleanup () {
65-
Object.keys(iterators).forEach(function (ix) {
66-
iterators[ix].end(function () {});
67-
});
68-
iterators = null;
69-
}
13+
var sockPath = path.join(dir, 'level-party.sock');
14+
var client = multileveldown.client(opts);
15+
16+
client.open(tryConnect);
17+
18+
function tryConnect () {
19+
if (!client.isOpen()) return;
20+
21+
var socket = net.connect(sockPath);
22+
var connected = false;
23+
24+
socket.on('connect', function () {
25+
connected = true;
7026
});
71-
server.listen(sockfile);
72-
73-
server.once('error', onerror);
74-
server.once('listening', onlistening);
75-
76-
function onerror (err) {
77-
server.removeListener('listening', onlistening);
78-
if (!times && err && err.code === 'EADDRINUSE') {
79-
fs.unlink(sockfile, function (err) {
80-
if (err) db.emit('error', err)
81-
else onopen(1)
82-
});
27+
28+
pump(socket, client.createRpcStream(), socket, function () {
29+
if (!client.isOpen()) return;
30+
31+
var db = level(dir, opts);
32+
33+
db.on('error', onerror);
34+
db.on('open', onopen);
35+
36+
function onerror (err) {
37+
db.removeListener('open', onopen);
38+
if (err.type === 'OpenError') {
39+
if (connected) return tryConnect();
40+
setTimeout(tryConnect, 100);
41+
}
8342
}
84-
else db.emit('error', err);
85-
}
86-
87-
function onlistening () {
88-
server.removeListener('error', onerror);
89-
db.close = function () {
90-
proxy.swap(null);
91-
server.close();
92-
};
93-
proxy.swap(db);
94-
proxy.emit('open');
95-
}
96-
}
97-
98-
function createStream () {
99-
var xdb = multilevel.client(manifest);
100-
101-
var iteratorIx = 0;
102-
xdb.iterator = function (opts) {
103-
var ix = iteratorIx ++;
104-
xdb._iteratorCreate(ix, opts);
105-
106-
return { next: next, end: end };
107-
function next (cb) { xdb._iteratorNext(ix, cb) }
108-
function end (cb) { xdb._iteratorEnd(ix, cb) }
109-
};
110-
111-
(function connect () {
112-
var stream = net.connect(sockfile);
113-
stream.on('connect', function () {
114-
xdb.open = function () {};
115-
xdb.close = function () {
116-
proxy.swap(null);
117-
stream.removeListener('end', onend);
118-
stream.end();
119-
};
120-
proxy.swap(xdb);
121-
proxy.emit('open');
122-
});
123-
124-
stream.on('error', function (err) {
125-
stream.removeListener('end', onend);
126-
setTimeout(connect, 50);
127-
});
128-
stream.on('end', onend);
129-
stream.pipe(xdb.createRpcStream()).pipe(stream);
130-
131-
function onend () {
132-
proxy.swap(null);
133-
withProxy(proxy, dir, opts);
43+
44+
function onopen () {
45+
db.removeListener('error', onerror);
46+
fs.unlink(sockPath, function (err) {
47+
if (err && err.code !== 'ENOENT') return db.emit('error', err);
48+
if (!client.isOpen()) return;
49+
50+
var sockets = [];
51+
var down = client.db;
52+
53+
var server = net.createServer(function (sock) {
54+
if (sock.unref) sock.unref();
55+
sockets.push(sock);
56+
pump(sock, multileveldown.server(db), sock, function () {
57+
sockets.splice(sockets.indexOf(sock), 1);
58+
});
59+
});
60+
61+
client.db = db.db;
62+
client.close = shutdown;
63+
client.emit('leader');
64+
65+
server.listen(sockPath, onlistening);
66+
67+
function shutdown (cb) {
68+
sockets.forEach(function (sock) {
69+
sock.destroy();
70+
});
71+
server.on('close', function () {
72+
db.close(cb);
73+
});
74+
server.close();
75+
}
76+
77+
function onlistening () {
78+
if (server.unref) server.unref();
79+
if (down.isFlushed()) return;
80+
81+
var sock = net.connect(sockPath);
82+
pump(sock, down.createRpcStream(), sock);
83+
down.flush(function () {
84+
sock.destroy();
85+
});
86+
}
87+
});
13488
}
135-
})();
136-
}
137-
}
89+
});
90+
};
91+
92+
return client;
93+
};

package.json

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,10 @@
44
"description": "open a leveldb handle multiple times",
55
"main": "index.js",
66
"dependencies": {
7-
"has": "~0.0.1",
7+
"has": "^1.0.0",
88
"level": "~0.18.0",
9-
"level-proxy": "^1.1.1",
10-
"level-manifest": "^1.2.0",
11-
"multilevel": "^6.0.1"
9+
"multileveldown": "^1.0.2",
10+
"pump": "^1.0.0"
1211
},
1312
"devDependencies": {
1413
"tape": "^3.0.0",

0 commit comments

Comments
 (0)