Skip to content

Commit

Permalink
Move logic to ensure directory into util function
Browse files Browse the repository at this point in the history
  • Loading branch information
albe committed May 15, 2021
1 parent b058ae2 commit d2cea70
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 24 deletions.
7 changes: 2 additions & 5 deletions src/Consumer.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
const stream = require('stream');
const fs = require('fs');
const path = require('path');
const mkdirpSync = require('mkdirp').sync;
const { assert } = require('./util');
const { assert, ensureDirectory } = require('./util');

const Storage = require('./Storage/ReadableStorage');
const MAX_CATCHUP_BATCH = 10;
Expand Down Expand Up @@ -59,9 +58,7 @@ class Consumer extends stream.Readable {
this.indexName = indexName;
const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers');
this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier);
if (!fs.existsSync(consumerDirectory)) {
mkdirpSync(consumerDirectory);
} else {
if (ensureDirectory(consumerDirectory)) {
this.cleanUpFailedWrites();
}
}
Expand Down
7 changes: 2 additions & 5 deletions src/Index/WritableIndex.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const fs = require('fs');
const mkdirpSync = require('mkdirp').sync;
const ReadableIndex = require('./ReadableIndex');
const { assertEqual, buildMetadataHeader } = require('../util');
const { assertEqual, buildMetadataHeader, ensureDirectory } = require('../util');

/**
* An index is a simple append-only file that stores an ordered list of entry elements pointing to the actual file position
Expand Down Expand Up @@ -45,9 +44,7 @@ class WritableIndex extends ReadableIndex {
*/
initialize(options) {
super.initialize(options);
if (!fs.existsSync(options.dataDirectory)) {
mkdirpSync(options.dataDirectory);
}
ensureDirectory(options.dataDirectory);

this.fileMode = 'a+';
this.writeBuffer = Buffer.allocUnsafe(options.writeBufferSize >>> 0); // jshint ignore:line
Expand Down
7 changes: 2 additions & 5 deletions src/Partition/WritablePartition.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const fs = require('fs');
const mkdirpSync = require('mkdirp').sync;
const ReadablePartition = require('./ReadablePartition');
const { assert, buildMetadataHeader, alignTo } = require('../util');
const { assert, buildMetadataHeader, alignTo, ensureDirectory } = require('../util');
const Clock = require('../Clock');

const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;
Expand Down Expand Up @@ -40,9 +39,7 @@ class WritablePartition extends ReadablePartition {
config.metadata = Object.assign(defaults.metadata, config.metadata);
config = Object.assign(defaults, config);
super(name, config);
if (!fs.existsSync(this.dataDirectory)) {
mkdirpSync(this.dataDirectory);
}
ensureDirectory(this.dataDirectory);
this.fileMode = 'a+';
this.writeBufferSize = config.writeBufferSize >>> 0; // jshint ignore:line
this.maxWriteBufferDocuments = config.maxWriteBufferDocuments >>> 0; // jshint ignore:line
Expand Down
10 changes: 2 additions & 8 deletions src/Storage/WritableStorage.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
const fs = require('fs');
const mkdirpSync = require('mkdirp').sync;
const path = require('path');
const WritablePartition = require('../Partition/WritablePartition');
const WritableIndex = require('../Index/WritableIndex');
const ReadableStorage = require('./ReadableStorage');
const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata } = require('../util');
const { assert, matches, buildMetadataForMatcher, buildMatcherFromMetadata, ensureDirectory } = require('../util');

const DEFAULT_WRITE_BUFFER_SIZE = 16 * 1024;

Expand Down Expand Up @@ -57,12 +56,7 @@ class WritableStorage extends ReadableStorage {
};
config = Object.assign(defaults, config);
config.indexOptions = Object.assign({ syncOnFlush: config.syncOnFlush }, config.indexOptions);
if (!fs.existsSync(config.dataDirectory)) {
try {
mkdirpSync(config.dataDirectory);
} catch (e) {
}
}
ensureDirectory(config.dataDirectory);
super(storageName, config);

this.lockFile = path.resolve(this.dataDirectory, this.storageFile + '.lock');
Expand Down
26 changes: 25 additions & 1 deletion src/util.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const crypto = require('crypto');
const fs = require('fs');
const mkdirpSync = require('mkdirp').sync;

/**
* Assert that actual and expected match or throw an Error with the given message appended by information about expected and actual value.
Expand Down Expand Up @@ -183,6 +185,27 @@ function wrapAndCheck(index, length) {
return index;
}

/**
* Ensure that the given directory exists.
* @param {string} dirName
* @return {boolean} true if the directory existed already
*/
function ensureDirectory(dirName) {
if (!fs.existsSync(dirName)) {
try {
mkdirpSync(dirName);
} catch (e) {
if (e.code !== 'EEXIST') {
throw e;
}
return true;
}
return false;
}
return true;
}


module.exports = {
assert,
assertEqual,
Expand All @@ -193,5 +216,6 @@ module.exports = {
buildMetadataForMatcher,
buildMatcherFromMetadata,
buildMetadataHeader,
alignTo
alignTo,
ensureDirectory
};

0 comments on commit d2cea70

Please sign in to comment.