forked from matthewfl/node-host
/
db.nstore.js
124 lines (114 loc) · 2.96 KB
/
db.nstore.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
var net = require('net');
//var nStore = require('./lib/nstore');
var config = require('./config');
var master=false;
var store;
exports.get = function (key, callback) {
if(master)
store.get(key, function (err, doc, meta) { callback(err ? null : doc) });
else
queue.add("r"+key, callback);
};
exports.has = function (key, callback) {
if(master)
store.get(key, function (err) {
callback(!err);
});
else
queue.add("h"+key, function (a) { callback(!!a); });
};
exports.set = function (key, value, callback) {
callback = callback || function () {};
if(typeof value != "string")
value = JSON.stringify(value);
if(master)
store.save(key, value, callback);
else
queue.add("s"+key.length+"\n"+key+value, callback);
};
exports.remove = function (key, callback) {
callback = callback || function () {};
if(master)
store.save(key, null, callback);
else
queue.add("d"+key, callback);
};
exports._isMaster = function () {
master=true;
store = require('./lib/nstore')('./database.db');
net.createServer(function (stream) {
console.log('db connection');
stream.setEncoding('ascii');
stream.on('data', function (data) {
switch(data[0]) {
case 'e':
stream.end();
return;
case 'r':
store.get(data.substring(1).split("\n")[0], function (err, doc, meta) {
stream.write(doc+"\n");
});
return;
case 's':
var length = data.substring(1).split("\n")[0]*1;
var key = data.substring(data.indexOf("\n"),length);
var value = data.substring(length+1);
store.save(key,value,function () {});
stream.write("\n");
return;
case 'd':
store.save(data.substring(1).split("\n")[0], null, function () {});
stream.write("\n");
return;
case 'h':
store.get(data.substring(1).split("\n")[0], function (err, doc, meta) {
stream.write(err ? "0\n" : "1\n");
});
}
});
stream.on('end', function (data) {
stream.end();
});
}).listen(config.testDbPort);
};
var queue = {
connection: false,
buffer: "",
checkConnection: function () {
if(!queue.connection) {
queue.running = true;
queue.connection = net.createConnection(config.testDbPort);
queue.connection.on('data', function (data) {
queue.buffer += data;
var l = queue.buffer.indexOf("\n");
if(l != -1) {
queue.callback.shift()(queue.buffer.substring(0,l));
queue.buffer = queue.buffer.substring(l+1);
queue.running = false;
process.nextTick(queue.doAct)
}
});
queue.connection.on('connect', function () {
queue.running = false;
process.nextTick(queue.doAct);
});
queue.connection.on('end', function () {
queue.connection = false;
});
}
},
running: false,
doAct: function () {
queue.checkConnection();
if(!queue.running && queue.q.length) {
queue.connection.write(queue.q.shift() + "\n");
}
},
q: [],
callback: [],
add: function (msg, callback) {
queue.q.push(msg);
queue.callback.push(callback);
queue.doAct();
},
};