Skip to content

Commit

Permalink
Add stream support in storage clients. Closes #1702 (#1703)
Browse files Browse the repository at this point in the history
* Add stream support in storage clients (closes #1702)

1. Extend BlobClient to support streams
2. Change StorageBackend clients with getStream and putStream methods
3. Update tests for streams in StorageBackends.spec.js

* WIP- Update aws-sdk and remove pormisify in S3Client

* WIP- Skip unit test for gme client

* Refactor stream support files and tests

1. Move StorageHelpers.js to the deepforge/storage
2. Change StreamBlobClient as an ES6 class without changing constructor signature
3. Change method checkStreamsInBrowser to EnsureStreamSupport(synchronous)
4. Add a StringWritable class in storageBackends.spec.js and verify content in memory
5. Don't skip the GMEStorage unit test as its prototype has getBlobClientParams

* WIP- Fix getBlobClientParameters signature

* WIP-Remove StreamBlobClient from src/common

* Refactor StorageHelpers.js back to StorageClient.js

* pin webgme to v2.42.0

* WIP- Add Readable stream check before putStream

* WIP- refactor getFile->getFileStream, putFile->putFileStream storage

* WIP- add uniform error message in StorageClient.js

* WIP- Change Storage.getStream to Storage.getFileStream
  • Loading branch information
umesh-timalsina committed Jul 23, 2020
1 parent 9e9dca1 commit 41e915d
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 19 deletions.
23 changes: 22 additions & 1 deletion src/common/storage/backends/StorageClient.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
/* globals define*/
define([
'client/logger',
'deepforge/gmeConfig'
'deepforge/gmeConfig',
], function(
Logger,
gmeConfig
) {
const fetch = require.isBrowser ? window.fetch :
require.nodeRequire('node-fetch');
const Headers = require.isBrowser ? window.Headers : fetch.Headers;
const stream = require.isBrowser ? null : require.nodeRequire('stream');
const StorageClient = function(id, name, logger) {
this.id = id;
this.name = name;
Expand Down Expand Up @@ -48,10 +49,18 @@ define([
throw new Error(`File download not implemented for ${this.name}`);
};

StorageClient.prototype.getFileStream = async function(/*dataInfo*/) {
throw new Error(`Stream download not implemented for ${this.name}`);
};

StorageClient.prototype.putFile = async function(/*filename, content*/) {
throw new Error(`File upload not supported by ${this.name}`);
};

StorageClient.prototype.putFileStream = async function(/*filename, stream*/) {
throw new Error(`Stream upload not supported by ${this.name}`);
};

StorageClient.prototype.deleteFile = async function(/*dataInfo*/) {
throw new Error(`File deletion not supported by ${this.name}`);
};
Expand Down Expand Up @@ -87,5 +96,17 @@ define([
throw new Error(`stat not implemented for ${this.name}`);
};

StorageClient.prototype.ensureStreamSupport = function() {
if(require.isBrowser) {
throw new Error('Streams are not supported in browser');
}
};

StorageClient.prototype.ensureReadableStream = function (obj) {
if(stream && !(obj instanceof stream.Readable)) {
throw new Error(`${obj} should be an instance of a readable stream`);
}
};

return StorageClient;
});
13 changes: 13 additions & 0 deletions src/common/storage/backends/gme/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,24 @@ define([
return await this.blobClient.getObject(data);
};

GMEStorage.prototype.getFileStream = async function(dataInfo) {
const url = await this.getDownloadURL(dataInfo);
const response = await this.fetch(url, {method: 'GET'});
return response.body;
};

GMEStorage.prototype.putFile = async function(filename, content) {
const hash = await this.blobClient.putFile(filename, content);
return this.createDataInfo(hash);
};

GMEStorage.prototype.putFileStream = async function(filename, stream) {
this.ensureStreamSupport();
this.ensureReadableStream(stream);
const hash = await this.blobClient.putFile(filename, stream);
return this.createDataInfo(hash);
};

GMEStorage.prototype.deleteDir =
GMEStorage.prototype.deleteFile = async function() {};

Expand Down
36 changes: 35 additions & 1 deletion src/common/storage/backends/s3/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ define([
this.logger.error(`Failed to create bucket ${this.bucketName} in S3 server.`);
throw err;
}

}
};

Expand All @@ -78,6 +77,16 @@ define([
return data.Body;
};

S3Storage.prototype.getFileStream = async function(dataInfo) {
const {endpoint, bucketName, filename} = dataInfo.data;
const {accessKeyId, secretAccessKey} = this.config;
const s3Client = await this.getS3Client({endpoint, accessKeyId, secretAccessKey});
return s3Client.getObject({
Bucket: bucketName,
Key: filename
}).createReadStream();
};

S3Storage.prototype.putFile = async function (filename, content) {
const s3Client = await this.getS3Client();
await this.createBucketIfNeeded(s3Client);
Expand All @@ -98,6 +107,31 @@ define([
return dataInfo;
};

S3Storage.prototype.putFileStream = async function(filename, stream) {
this.ensureStreamSupport();
this.ensureReadableStream(stream);
const s3Client = await this.getS3Client();
const params = await this.getUploadParams(s3Client, filename, stream);
try {
await s3Client.upload(params).promise();
} catch (err) {
throw new Error(`Unable to upload ${stream}: ${err.message}`);
}
const dataInfo = await this.stat(filename);
this.logger.debug(`Successfully uploaded file ${filename} to the S3 server using stream`);
return dataInfo;
};

S3Storage.prototype.getUploadParams = async function(s3Client, filename, body) {
await this.createBucketIfNeeded(s3Client);
this.logger.debug(`Created bucket ${this.bucketName}`);
return {
Body: require.isBrowser ? new Blob([body]) : body,
Bucket: this.bucketName,
Key: filename,
};
};

S3Storage.prototype.deleteDir = async function (dirname) {
const s3Client = await this.getS3Client();
const {Contents} = await s3Client.listObjectsV2({
Expand Down
23 changes: 20 additions & 3 deletions src/common/storage/backends/sciserver-files/Client.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ define([
SciServerFiles.prototype = Object.create(StorageClient.prototype);

SciServerFiles.prototype.getFile = async function (dataInfo) {
let {volume, filename, volumePool='Storage'} = dataInfo.data;
const url = `file/${volumePool}/${volume}/${filename}`;
const response = await this.fetch('download', url);
const response = await this.getDownloadResponse(dataInfo);
if (require.isBrowser) {
return await response.arrayBuffer();
} else {
return Buffer.from(await response.arrayBuffer());
}
};

SciServerFiles.prototype.getFileStream = async function(dataInfo) {
const response = await this.getDownloadResponse(dataInfo);
return response.body;
};

SciServerFiles.prototype.putFile = async function (filename, content) {
if (!this.volume) {
throw new Error('Cannot upload file to SciServer. No volume specified.');
Expand All @@ -49,6 +52,14 @@ define([
return this.createDataInfo(metadata);
};

SciServerFiles.prototype.putFileStream = async function(filename, stream) {
this.ensureStreamSupport();
this.ensureReadableStream(stream);
await this.putFile(filename, stream);
// stat necessary because of byteLength
return await this.stat(filename);
};

SciServerFiles.prototype.deleteDir = async function (dirname) {
const url = `data/${this.volumePool}/${this.volume}/${dirname}`;
const opts = {method: 'DELETE'};
Expand Down Expand Up @@ -120,5 +131,11 @@ define([
return this.createDataInfo(metadata);
};

SciServerFiles.prototype.getDownloadResponse = async function (dataInfo) {
let {volume, filename, volumePool='Storage'} = dataInfo.data;
const url = `file/${volumePool}/${volume}/${filename}`;
return await this.fetch('download', url);
};

return SciServerFiles;
});
5 changes: 5 additions & 0 deletions src/common/storage/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ define([
return client.getFile(dataInfo);
};

Storage.getFileStream = async function(dataInfo, logger, configs) {
const client = await this.getClientForDataInfo(dataInfo, logger, configs);
return client.getFileStream(dataInfo);
};

Storage.deleteFile = async function(dataInfo, logger, configs) {
const client = await this.getClientForDataInfo(dataInfo, logger, configs);
return client.deleteFile(dataInfo);
Expand Down
76 changes: 62 additions & 14 deletions test/integration/StorageBackends.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,35 @@
describe('Storage Features Test', function () {
this.timeout(5000);
const assert = require('assert');
const fs = require('fs');
const testFixture = require('../globals');
const {promisify} = require('util');
let {Writable, pipeline} = require('stream');
pipeline = promisify(pipeline);
const {requirejs} = testFixture;
const TEST_STORAGE = 'storageFeaturesSpec';
const TEST_PATH = `${TEST_STORAGE}/dummyFile`;
const TEST_FILE_NAME = 'TestFile';
const CONTENT = 'A Quick Brown Fox Jumped over a lazy Dog';
const logger = testFixture.logger.fork('StorageTests');
const Storage = requirejs('deepforge/storage/index');
const gmeConfig = testFixture.getGmeConfig();
const server = new testFixture.WebGME.standaloneServer(gmeConfig);
server.start = promisify(server.start);
server.stop = promisify(server.stop);
const { StringDecoder } =require('string_decoder');


const storageBackends = Storage.getAvailableBackends();
let StorageConfigs,
client,
clients = {},
dataInfo;
dataInfoBuffer,
dataInfoStream;

before(async function () {
await server.start();
fs.writeFileSync(TEST_FILE_NAME, CONTENT);
StorageConfigs = await testFixture.getStorageConfigs();
for (const backend of storageBackends) {
client = await Storage.getClient(backend, logger, StorageConfigs[backend]);
Expand All @@ -35,37 +43,52 @@ describe('Storage Features Test', function () {
for (const backend of storageBackends) {
it(`should putFile using ${backend}`, async function() {
this.retries(maxRetries(backend));
dataInfo = await clients[backend].putFile(TEST_PATH,
Buffer.from('A Quick Brown Fox Jumped over a lazy Dog.'));
dataInfoBuffer = await clients[backend].putFile(TEST_PATH,
Buffer.from(CONTENT));
});

it(`should getFile using ${backend}`, async function() {
this.retries(maxRetries(backend));
await clients[backend].getFile(dataInfo);
await clients[backend].getFile(dataInfoBuffer);
});

it(`should getCachePath using ${backend}`, async () => {
await clients[backend].getCachePath(dataInfo);
await clients[backend].getCachePath(dataInfoBuffer);
});

it(`should stat file using ${backend}`, async () => {
if(backend !== 'gme'){
await clients[backend].stat(TEST_PATH);
} else {
assert.rejects(clients[backend].stat(TEST_PATH), {
name: 'Error',
message: 'stat not implemented for WebGME Blob Storage'
});
}
if(backend !== 'gme'){
await clients[backend].stat(TEST_PATH);
} else {
assert.rejects(clients[backend].stat(TEST_PATH), {
name: 'Error',
message: 'stat not implemented for WebGME Blob Storage'
});
}
});

it(`should putFileStream using ${backend}`, async function () {
this.retries(maxRetries([backend]));
const stream = fs.createReadStream(TEST_FILE_NAME);
const pathInStorageBackend = `${TEST_STORAGE}/${TEST_FILE_NAME}`;
dataInfoStream = await clients[backend].putFileStream(pathInStorageBackend, stream);
});

it(`should getFileStream using ${backend}`, async function () {
this.retries(maxRetries([backend]));
const inputStream = await clients[backend].getFileStream(dataInfoStream);
await verifyStreamContent(inputStream);
});

it(`should deleteFile using ${backend}`, async function() {
this.retries(maxRetries(backend));
await clients[backend].deleteFile(dataInfo);
await clients[backend].deleteFile(dataInfoBuffer);
await clients[backend].deleteFile(dataInfoStream);
});
}

after(async function () {
fs.unlinkSync(TEST_FILE_NAME);
await server.stop();
});

Expand All @@ -75,4 +98,29 @@ describe('Storage Features Test', function () {
}
return 1;
}

async function verifyStreamContent(inputStream) {
const outputStream = new StringWritable();
await pipeline(inputStream, outputStream);
assert(outputStream.data === CONTENT);
}

class StringWritable extends Writable {
constructor(options) {
super(options);
this._decoder = new StringDecoder(options && options.defaultEncoding);
this.data = '';
}
_write(chunk, encoding, callback) {
if (encoding === 'buffer') {
chunk = this._decoder.write(chunk);
}
this.data += chunk;
callback();
}
_final(callback) {
this.data += this._decoder.end();
callback();
}
}
});

0 comments on commit 41e915d

Please sign in to comment.