Skip to content

Commit

Permalink
- Remove the distinction between sync/async readFileFn in order to use a
Browse files Browse the repository at this point in the history
 unified codepath.
- Remove the need for calling the preprocessFinished() function after defining
  a (now async) preprocess() callback.
  • Loading branch information
Raphaël Droz authored and drzraf committed Apr 5, 2023
1 parent b0dffad commit 3388ab4
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 65 deletions.
4 changes: 2 additions & 2 deletions samples/Encryption.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const flow = new Flow({
uploadMethod: 'POST',
fileParameterName: 'file',
// Asynchronous function called before each chunk upload request
asyncReadFileFn: async function(flowObj, startByte, endByte, fileType, chunk) {
readFileFn: async function(flowObj, startByte, endByte, fileType, chunk) {
// Load file chunk in memory
const plaintextbytes = await readFileChunk(flowObj.file, startByte, endByte);
// Encrypt chunk
Expand Down Expand Up @@ -109,7 +109,7 @@ class StreamEncryptor {
var encryptor = new StreamEncryptor(gpgKeys);
new Flow({
// ...
asyncReadFileFn: encryptor.read.bind(encryptor),
readFileFn: encryptor.read.bind(encryptor),
initFileFn: encryptor.init.bind(encryptor),
forceChunkSize: true,
});
Expand Down
70 changes: 15 additions & 55 deletions src/FlowChunk.js
Original file line number Diff line number Diff line change
Expand Up @@ -260,30 +260,7 @@ export default class FlowChunk {
}

/**
* Finish preprocess state
* @function
*/
async preprocessFinished() {
// Re-compute the endByte after the preprocess function to allow an
// implementer of preprocess to set the fileObj size
this.endByte = this.computeEndByte();

this.preprocessState = 2;
await this.send();
}

/**
* Finish read state
* @function
*/
readFinished(payload) {
this.readState = 2;
this.payload = payload;
this.send();
}

/**
* asyncReadFileFn() helper provides the ability of asynchronous read()
* readFileFn() helper provides the ability of asynchronous read()
* Eg: When reading from a ReadableStream.getReader().
*
* But:
Expand Down Expand Up @@ -322,15 +299,17 @@ export default class FlowChunk {

this.readState = 1;
await this.readStreamGuard();
var data, asyncRead = this.flowObj.opts.asyncReadFileFn;

data = await asyncRead(this.fileObj, this.startByte, this.endByte, this.fileObj.file.type, this);
let data = await this.flowObj.opts.readFileFn(this.fileObj, this.startByte, this.endByte, this.fileObj.file.type, this);
this.readStreamState.resolve();

// Equivalent to readFinished()
// Equivalent to former readFinished()
this.readState = 2;

if (data) {
if (typeof data === 'string') { // In case a regular string is returned
data = new Blob([data], {type: 'application/octet-stream'});
}

this.readBytes = data.size || data.size === 0 ? data.size : -1;
}

Expand Down Expand Up @@ -374,36 +353,17 @@ export default class FlowChunk {
*/
async send() {
var preprocess = this.flowObj.opts.preprocess;
var read = this.flowObj.opts.readFileFn;
var asyncRead = this.flowObj.opts.asyncReadFileFn;

if (typeof preprocess === 'function') {
switch (this.preprocessState) {
case 0:
this.preprocessState = 1;
preprocess(this);
return;
case 1:
return;
}
}

if (asyncRead) {
await this.readStreamChunk();
return;
}

switch (this.readState) {
case 0:
this.readState = 1;
const data = read(this.fileObj, this.startByte, this.endByte, this.fileObj.file.type, this);
this.readFinished(data);
return;
case 1:
return;
this.preprocessState = 1;
await preprocess(this);
// Finish preprocess state
// Re-compute the endByte after the preprocess function to allow an
// implementer of preprocess to set the fileObj size
this.endByte = this.computeEndByte();
this.preprocessState = 2;
}

this.xhrSend(this.payload);
await this.readStreamChunk();
}

/**
Expand Down
10 changes: 5 additions & 5 deletions test/asyncSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ describe('upload stream', function() {
xhr_server.restore();
});

it('synchronous initFileFn and asyncReadFileFn', function (done) {
it('synchronous initFileFn and async readFileFn', function (done) {
// No File.stream() support : No test
// No support for skipping() test from Jasmine (https://github.com/jasmine/jasmine/issues/1709)
if (typeof Blob === 'undefined' || Blob.prototype.stream !== 'function') {
Expand Down Expand Up @@ -145,7 +145,7 @@ describe('upload stream', function() {
flow.opts.simultaneousUploads = simultaneousUploads;
flow.opts.initFileFn = streamer.init.bind(streamer);
flow.opts.readFileFn = streamer.read.bind(streamer);
flow.opts.asyncReadFileFn = streamer.read.bind(streamer);

(async function() {
await flow.addFile(sample_file);
await flow.upload();
Expand Down Expand Up @@ -208,7 +208,7 @@ describe('upload stream', function() {

var streamer = new Streamer(1);
flow.opts.initFileFn = streamer.init.bind(streamer);
flow.opts.asyncReadFileFn = streamer.read.bind(streamer);
flow.opts.readFileFn = streamer.read.bind(streamer);

flow.opts.chunkSize = 1;
flow.opts.maxChunkRetries = 3;
Expand Down Expand Up @@ -242,7 +242,7 @@ describe('upload stream', function() {
xhr_server.respondWith([200, { "Content-Type": "text/plain" }, 'ok']);
var streamer = new Streamer(1);
flow.opts.initFileFn = streamer.init.bind(streamer);
flow.opts.asyncReadFileFn = streamer.read.bind(streamer);
flow.opts.readFileFn = streamer.read.bind(streamer);

flow.opts.chunkSize = 1;
flow.opts.maxChunkRetries = 3;
Expand Down Expand Up @@ -283,7 +283,7 @@ describe('upload stream', function() {
xhr_server.configure({autoRespond: false, respondImmediately: false});
var streamer = new Streamer(1);
flow.opts.initFileFn = streamer.init.bind(streamer);
flow.opts.asyncReadFileFn = streamer.read.bind(streamer);
flow.opts.readFileFn = streamer.read.bind(streamer);

flow.opts.chunkSize = 1;
flow.opts.maxChunkRetries = 3;
Expand Down
4 changes: 1 addition & 3 deletions test/uploadSpec.js
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ describe('upload file', function() {
expect(xhr.requests.length).toBe(0);
expect(preprocess).toHaveBeenCalledWith(file.chunks[0]);
expect(file.chunks[0].preprocessState).toBe(1);
file.chunks[0].preprocessFinished();
expect(xhr.requests.length).toBe(1);
xhr.requests[0].respond(200, [], "response");
expect(success).toHaveBeenCalledWith(asCustomEvent(file, "response", file.chunks[0]));
Expand All @@ -416,7 +415,7 @@ describe('upload file', function() {
var file = flow.files[0];
var secondFile = flow.files[1];
await flow.upload();
expect(xhr.requests.length).toBe(0);
expect(xhr.requests.length).toBe(1);
expect(preprocess).toHaveBeenCalledWith(file.chunks[0]);
expect(preprocess).not.toHaveBeenCalledWith(secondFile.chunks[0]);

Expand All @@ -439,7 +438,6 @@ describe('upload file', function() {
await flow.upload();
for(var i=0; i<file.chunks.length; i++) {
expect(preprocess).toHaveBeenCalledWith(file.chunks[i]);
file.chunks[i].preprocessFinished();
await file.pause();
await file.resume();
xhr.requests[xhr.requests.length-1].respond(200, [], "response");
Expand Down

0 comments on commit 3388ab4

Please sign in to comment.