Skip to content

Commit

Permalink
Support sleep interval between chunks upload
Browse files Browse the repository at this point in the history
  • Loading branch information
buptsb committed Jan 8, 2024
1 parent 7a8234b commit 9766abb
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 21 deletions.
15 changes: 10 additions & 5 deletions bin/cmd.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,18 @@ function parseSinkType(sinkType) {
return SinkType.memfile;
}
}
throw new Error(`Unkown sink type: ${options.sinkType}`)
throw new Error(`Unknown sink type: ${options.sinkType}`)
}

const program = new Command();

/*
program
.command('test')
.argument('<size>', 'test upload buffer size, e.g. 1024 / 42K / 2M', String)
.argument('<chunkSize>', 'chunk size, e.g. 1024 / 42K / 2M', String)
.option('-s, --sinkType <sinkType>', 'use only this kind of sink', "bili")
.option('-c, --concurrency <concurrency>', 'upload concurrency', "10")
.option('-c, --concurrency <concurrency>', 'concurrency', "1")
.option('-m, --maskPhotoFilePath <maskPhotoFilePath>', 'maskPhotoFilePath', String)
.option('-u, --usedBits <usedBist>', 'usedBist', String)
.option('--no-validate', 'do not do validation')
Expand All @@ -61,13 +62,15 @@ program
);
console.log(await f.GenerateDescription());
});
*/

program
.command('upload')
.argument('<filePath>', 'upload file', String)
.argument('<chunkSize>', 'chunk size, e.g. 1024 / 42K / 2M', String)
.option('-s, --sinkType <sinkType>', 'use only this kind of sink', "bili")
.option('-c, --concurrency <concurrency>', 'upload concurrency', "10")
.option('-c, --concurrency <concurrency>', 'concurrency', "1")
.option('-i, --interval <sleep_interval>', 'interval', "200")
.option('-m, --maskPhotoFilePath <maskPhotoFilePath>', 'maskPhotoFilePath', String)
.option('--no-validate', 'do not do validation')
.action(async (filePath, chunkSize, options) => {
Expand All @@ -80,14 +83,16 @@ program
fs,
sinkType,
options.maskPhotoFilePath,
"",
parseInt(options.interval),
);
console.log(await f.GenerateDescription());
console.log(await f.GenerateDescription(true));
});

program
.command('download')
.argument('<desc>', 'desc string', String)
.option('-c, --concurrency <concurrency>', 'upload concurrency', "10")
.option('-c, --concurrency <concurrency>', 'concurrency', "10")
.action(async (desc, options) => {
const f = await DownloadFile.Create(
desc,
Expand Down
2 changes: 2 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export class SinkUploadConfig extends ConfigBase {
public encoderType: EncoderType,
public sinkType: SinkType,
signal: AbortSignal,
public sleepInterval: number,
) {
super(usedBits, cipherConfig, concurrency, signal);
}
Expand All @@ -64,6 +65,7 @@ export class SinkUploadConfig extends ConfigBase {
this.encoderType,
this.sinkType,
this.signal,
this.sleepInterval,
);
}

Expand Down
20 changes: 12 additions & 8 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ const debugLogger = debug('jpeg:file');
class BaseFile {
constructor() { }

async uploadBuffer(buf: Buffer, uploadConfig: SinkUploadConfig) {
const { filePtrs } = await sinkDelegate.UploadMultiple(() => buf, 1, uploadConfig);
async uploadBuffer(buf: Buffer, uploadConfig: SinkUploadConfig, uploadMsg: string) {
const { filePtrs } = await sinkDelegate.UploadMultiple(() => buf, 1, uploadConfig, uploadMsg);
assert(filePtrs.length === 1);
return filePtrs[0];
}

async upload<T extends UnknownMessage>(msg: T, uploadConfig: SinkUploadConfig) {
async upload<T extends UnknownMessage>(msg: T, uploadConfig: SinkUploadConfig, uploadMsg: string) {
const buf = Buffer.from(messageTypeRegistry.get(msg.$type)!.encode(msg).finish());
return this.uploadBuffer(buf, uploadConfig);
return this.uploadBuffer(buf, uploadConfig, uploadMsg);
}

async download<T extends UnknownMessage>(msgTyp: MessageType, ptr: PbFilePointer, downloadConfig: SinkDownloadConfig) {
const buf = await sinkDelegate.DownloadSingleFile(ptr, downloadConfig);
const buf = await sinkDelegate.DownloadSingle(ptr, downloadConfig);
return msgTyp.decode(buf) as T;
}
}
Expand Down Expand Up @@ -70,7 +70,7 @@ class IndexFile extends BaseFile {
next: undefined,
}
this.log("Gen from:", indexFile);
const indexFilePtr = await this.upload(indexFile, uploadConfig);
const indexFilePtr = await this.upload(indexFile, uploadConfig, "indexFile");
this.log("Gen result: ", indexFilePtr);
return indexFilePtr;
}
Expand Down Expand Up @@ -153,7 +153,7 @@ class BootloaderFile extends BaseFile {
checksum,
};
this.log("Gen from: ", blFile);
const bootloaderFilePtr = await this.upload(blFile, uploadConfig);
const bootloaderFilePtr = await this.upload(blFile, uploadConfig, "bootloaderFile");
return GenDescString(bootloaderFilePtr, blPassword);
}

Expand Down Expand Up @@ -212,6 +212,7 @@ export class UploadFile {
public sinkType: SinkType = SinkType.unknown,
maskPhotoFilePath: string = "",
usedBitsString: string = "",
sleepInterval: number = 0,
) {
assert(chunkSize > 0);

Expand All @@ -238,6 +239,7 @@ export class UploadFile {
EncoderType.jpegjsEncoder,
sinkType, // sinkType
null,
0,
);
this.dataUploadConfig = new SinkUploadConfig(
usedBits, // usedBits
Expand All @@ -248,6 +250,7 @@ export class UploadFile {
EncoderType.jpegjsEncoder,
sinkType, // sinkType
null,
sleepInterval,
);
this.log(
"Upload start with filePath/chunkSize/concurrency/validate/sinkType/maskPhoto:",
Expand Down Expand Up @@ -275,7 +278,8 @@ export class UploadFile {
const { filePtrs, totalUploadSize } = await sinkDelegate.UploadMultiple(
readChunk,
this.n_chunks,
this.dataUploadConfig
this.dataUploadConfig,
"chunks"
);

// step 2. create/upload index file(s)
Expand Down
13 changes: 10 additions & 3 deletions src/sinks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import crypto from 'crypto';

const log = debug('jpeg:sinks');

async function sleep(t: number) {
return new Promise(resolve => setTimeout(resolve, t));
}

class SinkDelegate {
private sinks = [
new WeiboSink(),
Expand Down Expand Up @@ -46,10 +50,10 @@ class SinkDelegate {
return sink;
}

async UploadMultiple(getBuf: (index: number) => Buffer, totalLength: number, config: SinkUploadConfig) {
async UploadMultiple(getBuf: (index: number) => Buffer, totalLength: number, config: SinkUploadConfig, uploadMsg: string) {
const { task, source$, pool, abortCtr } = RxTask.Create(totalLength, config.concurrency);
const usedConfig = config.cloneWithSignal(abortCtr.signal);
log("upload multiple with config", usedConfig);
log(`upload [${uploadMsg}] with config`, usedConfig);

const ob = source$.pipe(
task.createUnlimitedTasklet(async (index: number) => {
Expand All @@ -72,6 +76,9 @@ class SinkDelegate {
}),
task.createLimitedTasklet(async (params) => {
const urlString = await params.sink.DoUpload(params.encoded, config);
if (config.sleepInterval > 0) {
await sleep(config.sleepInterval);
}
const resourceID: PbResourceURL = {
$type: PbResourceURL.$type,
urlOneof: {
Expand Down Expand Up @@ -102,7 +109,7 @@ class SinkDelegate {
};
}

async DownloadSingleFile(chunk: PbFilePointer, config: SinkDownloadConfig) {
async DownloadSingle(chunk: PbFilePointer, config: SinkDownloadConfig) {
const cached = await firstValueFrom(this.DownloadMultiple([chunk], config));
return cached.decoded;
}
Expand Down
8 changes: 4 additions & 4 deletions tests/units/sink.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { EncoderType, SinkType } from "../../src/common-types";

const usedBits = new UsedBits(1, 4);
const cipherConfig = NewCipherConfigFromPassword(new Uint8Array(randomBytesArray(32)));
const uploadConfig = new SinkUploadConfig(usedBits, cipherConfig, 4, false, "", EncoderType.wasmEncoder, SinkType.memfile, null);
const uploadConfig = new SinkUploadConfig(usedBits, cipherConfig, 4, false, "", EncoderType.wasmEncoder, SinkType.memfile, null, 0);

async function uploadDownloadLoop(sink: BasicSink) {
const buf = Buffer.from(randomBytesArray(1024).buffer);
Expand All @@ -24,14 +24,14 @@ test("memfile upload/download loop", async () => {

test("chunk has incorrect checksum", async () => {
const buf = Buffer.from(randomBytesArray(1024).buffer);
const { filePtrs } = await sinkDelegate.UploadMultiple(() => buf, 1, uploadConfig);
const { filePtrs } = await sinkDelegate.UploadMultiple(() => buf, 1, uploadConfig, "test");
const fp = filePtrs[0];

const downloaded = await sinkDelegate.DownloadSingleFile(fp, uploadConfig.toDownloadConfig());
const downloaded = await sinkDelegate.DownloadSingle(fp, uploadConfig.toDownloadConfig());
expect(downloaded).toEqual(buf);

const incorrectChecksumFp = fp;
incorrectChecksumFp.checksum = Buffer.from("incorrect checksum");
const promise = sinkDelegate.DownloadSingleFile(incorrectChecksumFp, uploadConfig.toDownloadConfig());
const promise = sinkDelegate.DownloadSingle(incorrectChecksumFp, uploadConfig.toDownloadConfig());
await expect(promise).rejects.toThrow(/checksum mismatch/);
});
2 changes: 1 addition & 1 deletion tests/units/workers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { EncoderType, DecoderType } from "../../src/common-types";
const usedBits = new UsedBits(1, 4);
const cipherConfig = NewCipherConfigFromPassword(new Uint8Array(randomBytesArray(32)));
const downloadConfig = new SinkDownloadConfig(usedBits, cipherConfig, 4, DecoderType.wasmDecoder, null);
const uploadConfig = new SinkUploadConfig(usedBits, cipherConfig, 4, false, "", EncoderType.wasmEncoder, null, null);
const uploadConfig = new SinkUploadConfig(usedBits, cipherConfig, 4, false, "", EncoderType.wasmEncoder, null, null, 0);

async function doEncryptEncode(ab: ArrayBuffer) {
const pool = new WorkerPool();
Expand Down

0 comments on commit 9766abb

Please sign in to comment.