Skip to content
This repository was archived by the owner on Jan 9, 2023. It is now read-only.

Commit 3c6767b

Browse files
Bugfix/issue 95 worker mem leak 2 (#106)
* Refactored to remove the ClusterMessages module altogether - it was adding too much complexity and memory overhead for only a slight improvement in readability over just using the Cluster module directly as necessary. * Remove file ref count logic from CacheFS module. Memory usage was scaling with project size which eventually resulted in unacceptable memory usage. Also fixed a couple of warnings and errors in the test suite.
1 parent 747775c commit 3c6767b

File tree

10 files changed

+51
-182
lines changed

10 files changed

+51
-182
lines changed

lib/cache/cache_fs.js

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -9,25 +9,8 @@ const moment = require('moment');
99
const pick = require('lodash').pick;
1010
const crypto = require('crypto');
1111
const fileExtensions = require('lodash').invert(consts.FILE_TYPE);
12-
const cluster = require('cluster');
13-
const cm = require('../cluster_messages');
14-
15-
const k_incrementGuidRef = "_incrementGuidRef";
16-
const k_releaseGuidRef = "_releaseGuidRef";
1712

1813
class CacheFS extends CacheBase {
19-
constructor() {
20-
super();
21-
this._guidRefs = {};
22-
23-
cm.listenFor(k_incrementGuidRef, key => {
24-
this._incrementGuidRef(key);
25-
});
26-
27-
cm.listenFor(k_releaseGuidRef, key => {
28-
this._releaseGuidRef(key);
29-
});
30-
}
3114

3215
get _optionsPath() {
3316
return super._optionsPath + ".cache_fs";
@@ -65,39 +48,6 @@ class CacheFS extends CacheBase {
6548
return result;
6649
}
6750

68-
/**
69-
*
70-
* @param {String} key
71-
* @private
72-
*/
73-
_incrementGuidRef(key) {
74-
if(cluster.isWorker) {
75-
return cm.send(k_incrementGuidRef, key);
76-
}
77-
78-
if(this._guidRefs.hasOwnProperty(key)) {
79-
this._guidRefs[key]++;
80-
}
81-
else {
82-
this._guidRefs[key] = 1;
83-
}
84-
}
85-
86-
/**
87-
*
88-
* @param {String} key
89-
* @private
90-
*/
91-
_releaseGuidRef(key) {
92-
if(cluster.isWorker) {
93-
return cm.send(k_releaseGuidRef, key);
94-
}
95-
96-
if(this._guidRefs.hasOwnProperty(key)) {
97-
this._guidRefs[key]--;
98-
}
99-
}
100-
10151
/**
10252
*
10353
* @param {String} type
@@ -123,10 +73,6 @@ class CacheFS extends CacheBase {
12373
async _writeFileToCache(type, guid, hash, sourcePath) {
12474
const filePath = this._calcFilepath(type, guid, hash);
12575

126-
if(this._guidRefs.hasOwnProperty(filePath) && this._guidRefs[filePath] > 0) {
127-
throw new Error(`File is busy, cannot overwrite ${key}`);
128-
}
129-
13076
await fs.move(sourcePath, filePath, { overwrite: true });
13177
return filePath;
13278
}
@@ -143,13 +89,10 @@ class CacheFS extends CacheBase {
14389

14490
return new Promise((resolve, reject) => {
14591
stream.on('open', () => {
146-
this._incrementGuidRef(key);
14792
resolve(stream);
14893
}).on('error', err => {
14994
helpers.log(consts.LOG_ERR, err);
15095
reject(err);
151-
}).on('close', () => {
152-
this._releaseGuidRef(key);
15396
});
15497
});
15598
}

lib/cache/reliability_manager.js

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
const cluster = require('cluster');
22
const path = require('path');
33
const helpers = require('../helpers');
4-
const cm = require('../cluster_messages');
54
const consts = require('../constants');
65
const crypto = require('crypto');
76
const _ = require('lodash');
@@ -15,6 +14,8 @@ const defaultOptions = {
1514

1615
class ReliabilityManager {
1716
constructor(db, cachePath, options) {
17+
this._id = ++ReliabilityManager._idCounter;
18+
this._kMsgUpdateReliabilityFactor = `_updateReliabilityFactorForVersion.${this._id}`;
1819
this._db = db;
1920
this._cachePath = cachePath;
2021
this._options = options || {};
@@ -30,9 +31,13 @@ class ReliabilityManager {
3031
}
3132
}
3233

33-
cm.listenFor("_updateReliabilityFactorForVersion", (data) => {
34-
return this._updateReliabilityFactorForVersion(data);
35-
});
34+
if(cluster.isMaster) {
35+
cluster.on('message', (worker, msg) => {
36+
if(msg._msg === this._kMsgUpdateReliabilityFactor) {
37+
return this._updateReliabilityFactorForVersion(msg);
38+
}
39+
});
40+
}
3641
}
3742

3843
/**
@@ -43,7 +48,8 @@ class ReliabilityManager {
4348
*/
4449
async _updateReliabilityFactorForVersion(params) {
4550
if(cluster.isWorker) {
46-
return cm.send('_updateReliabilityFactorForVersion', params);
51+
params._msg = this._kMsgUpdateReliabilityFactor;
52+
return process.send(params);
4753
}
4854

4955
const entry = this.getEntry(params.guidStr, params.hashStr, true);
@@ -160,4 +166,6 @@ ReliabilityManager.reliabilityStates = {
160166
Reliable: 'Reliable'
161167
};
162168

169+
ReliabilityManager._idCounter = 0;
170+
163171
module.exports = ReliabilityManager;

lib/cluster_messages.js

Lines changed: 0 additions & 87 deletions
This file was deleted.

lib/server/command_processor.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class CommandProcessor extends Duplex {
113113
if(this[kReadStream] === null || this[kSource] === null) return;
114114

115115
let chunk;
116-
while(chunk = this[kReadStream].read()) {
116+
while((chunk = this[kReadStream].read()) !== null) {
117117
this._sendFileQueueChunkReads++;
118118
this._sendFileQueueReadBytes += chunk.length;
119119
if(!this.push(chunk, 'ascii')) break;

test/cache_api.js

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ const loki = require('lokijs');
66
const fs = require('fs-extra');
77
const sleep = require('./test_utils').sleep;
88
const generateCommandData = require('./test_utils').generateCommandData;
9-
const readStream = require('./test_utils').readStream;
109
const sinon = require('sinon');
1110
const crypto = require('crypto');
1211
const consts = require('../lib/constants');
@@ -147,32 +146,6 @@ describe("Cache API", function() {
147146
.then(() => { throw new Error("Expected error!"); }, err => assert(err));
148147
});
149148

150-
it("should throw an error when trying to replace a file that is open for reading", async () => {
151-
const TEST_FILE_SIZE = 1024 * 64 * 2;
152-
153-
const fData = generateCommandData(TEST_FILE_SIZE, TEST_FILE_SIZE);
154-
155-
// Add a file to the cache (use the info data)
156-
let trx = await cache.createPutTransaction(fData.guid, fData.hash);
157-
let wStream = await trx.getWriteStream(consts.FILE_TYPE.INFO, fData.info.length);
158-
await new Promise(resolve => wStream.end(fData.info, resolve));
159-
await cache.endPutTransaction(trx);
160-
161-
// Get a read stream
162-
const rStream = await cache.getFileStream(consts.FILE_TYPE.INFO, fData.guid, fData.hash);
163-
164-
// Read a byte
165-
await new Promise(resolve => rStream.once('readable', () => resolve(rStream.read(1))));
166-
167-
// Try to replace the file (use the resource data)
168-
trx = await cache.createPutTransaction(fData.guid, fData.hash);
169-
wStream = await trx.getWriteStream(consts.FILE_TYPE.INFO, fData.resource.length);
170-
await new Promise(resolve => wStream.end(fData.resource, resolve));
171-
172-
cache.endPutTransaction(trx).then(() => { throw new Error("Expected error"); }, (err) => assert(err))
173-
.then(() => rStream.destroy());
174-
});
175-
176149
describe("High Reliability Mode", () => {
177150
before(async () => {
178151
const opts = cache._options;

test/cache_fs.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ describe("Cache: FS", () => {
4848
cache = new Cache();
4949
});
5050

51-
afterEach(() => fs.remove(cacheOpts.cachePath));
51+
afterEach(async () => {
52+
await cache.shutdown();
53+
return fs.remove(cacheOpts.cachePath)
54+
});
5255

5356
describe("cleanup", function() {
5457
this.slow(500);
@@ -189,7 +192,7 @@ describe("Cache: FS", () => {
189192

190193
await cache.init(opts);
191194
const file = await addFileToCache(moment().toDate());
192-
cache.cleanup(true);
195+
await cache.cleanup(true);
193196
assert(await fs.pathExists(file.path));
194197
});
195198

test/cache_ram.js

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,33 @@ describe("Cache: RAM", function() {
112112
await cache.endPutTransaction(trx);
113113
assert(ok);
114114
});
115+
116+
it("should throw an error when trying to replace a file that is open for reading", async () => {
117+
const TEST_FILE_SIZE = 1024 * 64 * 2;
118+
119+
const fData = generateCommandData(TEST_FILE_SIZE, TEST_FILE_SIZE);
120+
121+
// Add a file to the cache (use the info data)
122+
await cache.init(opts);
123+
let trx = await cache.createPutTransaction(fData.guid, fData.hash);
124+
let wStream = await trx.getWriteStream(consts.FILE_TYPE.INFO, fData.info.length);
125+
await new Promise(resolve => wStream.end(fData.info, resolve));
126+
await cache.endPutTransaction(trx);
127+
128+
// Get a read stream
129+
const rStream = await cache.getFileStream(consts.FILE_TYPE.INFO, fData.guid, fData.hash);
130+
131+
// Read a byte
132+
await new Promise(resolve => rStream.once('readable', () => resolve(rStream.read(1))));
133+
134+
// Try to replace the file (use the resource data)
135+
trx = await cache.createPutTransaction(fData.guid, fData.hash);
136+
wStream = await trx.getWriteStream(consts.FILE_TYPE.INFO, fData.resource.length);
137+
await new Promise(resolve => wStream.end(fData.resource, resolve));
138+
139+
return cache.endPutTransaction(trx).then(() => { throw new Error("Expected error"); }, (err) => assert(err))
140+
.then(() => rStream.destroy());
141+
});
115142
});
116143

117144
describe("shutdown", () => {
@@ -144,7 +171,8 @@ describe("Cache: RAM", function() {
144171

145172
});
146173

147-
afterEach(() => {
174+
afterEach(async () => {
175+
await cache.shutdown();
148176
cache._clearCache();
149177
return fs.remove(opts.cachePath);
150178
});

test/server.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ describe("Server common", function() {
140140

141141
it("should log an error if the error code is 'EADDRINUSE", async () => {
142142
return new Promise((resolve, reject) => {
143-
server.errCallback = e => {}
143+
server.errCallback = e => {};
144144
helpers.setLogger((lvl, msg) => {
145145
/already in use/.test(msg) ? resolve() : reject();
146146
});

test/test_init.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
const helpers = require('../lib/helpers');
2+
require('cluster').setMaxListeners(25);
23

34
process.on('unhandledRejection', (reason) => {
45
console.error(reason);

test/test_utils.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ exports.readStream = function(stream, size) {
124124
const buffer = Buffer.alloc(size, 0, 'ascii');
125125
stream.on('readable', function() {
126126
let data;
127-
while(data = this.read()) {
127+
while((data = this.read()) !== null) {
128128
if(pos + data.length > size) {
129129
reject(new Error("Stream size exceeds buffer size allocation"));
130130
}

0 commit comments

Comments
 (0)