From c534ff1b0a6ee3ed37069e9b85176a60704f5e79 Mon Sep 17 00:00:00 2001 From: ElonH Date: Mon, 1 Jun 2020 13:49:35 +0800 Subject: [PATCH] feat(tasks-pool): a daemon service for posting tasks to rclone server --- src/app/@dataflow/rclone/async-post-flow.ts | 35 ++++++ src/app/@dataflow/rclone/index.ts | 2 + .../rclone/operations-copyfile-flow.ts | 37 +++++++ .../manager/clipboard/clipboard.service.ts | 16 ++- src/app/pages/manager/manager.component.ts | 25 ++++- src/app/pages/manager/task.service.spec.ts | 16 +++ src/app/pages/manager/task.service.ts | 104 ++++++++++++++++++ 7 files changed, 231 insertions(+), 4 deletions(-) create mode 100644 src/app/@dataflow/rclone/async-post-flow.ts create mode 100644 src/app/@dataflow/rclone/operations-copyfile-flow.ts create mode 100644 src/app/pages/manager/task.service.spec.ts create mode 100644 src/app/pages/manager/task.service.ts diff --git a/src/app/@dataflow/rclone/async-post-flow.ts b/src/app/@dataflow/rclone/async-post-flow.ts new file mode 100644 index 0000000..6f999c9 --- /dev/null +++ b/src/app/@dataflow/rclone/async-post-flow.ts @@ -0,0 +1,35 @@ +import { AjaxRequest } from 'rxjs/ajax'; +import { FlowSupNode, CombErr, AjaxFlowInteralNode } from '../core'; +import { IRcloneServer } from '../extra'; +import { PostFlow } from './post-flow'; + +export interface AsyncPostFlowParamsNode { + group?: string; +} +export interface AsyncPostFlowInNode extends IRcloneServer, AsyncPostFlowParamsNode {} + +export interface AsyncPostFlowOutNode { + jobid: number; +} + +export abstract class AsyncPostFlow< + Tin extends AsyncPostFlowInNode, + Tparms extends AsyncPostFlowParamsNode = AsyncPostFlowParamsNode, + Tsup extends FlowSupNode = Tin & AsyncPostFlowOutNode +> extends PostFlow { + // protected cmd: string; + // protected params: Tparms | ((pre: CombErr) => Tparms); + // protected cacheSupport: boolean; + // public prerequest$: Observable>; + + protected requestAjax(x: CombErr): AjaxRequest { + const res = super.requestAjax(x); + res.body._async = true; + return res; + } + protected reconstructAjaxResult(x: AjaxFlowInteralNode): CombErr { + if (x[1].length !== 0) return [{}, x[1]] as any; + const rsp = x[0].ajaxRsp.response; + return [{ jobid: rsp['jobid'] }, []]; + } +} diff --git a/src/app/@dataflow/rclone/index.ts b/src/app/@dataflow/rclone/index.ts index 4cab671..2747c88 100644 --- a/src/app/@dataflow/rclone/index.ts +++ b/src/app/@dataflow/rclone/index.ts @@ -7,3 +7,5 @@ export * from './connection-flow'; export * from './list-cmd-flow'; export * from './list-group-flow'; export * from './operations-mkdir-flow'; +export * from './async-post-flow'; +export * from './operations-copyfile-flow'; diff --git a/src/app/@dataflow/rclone/operations-copyfile-flow.ts b/src/app/@dataflow/rclone/operations-copyfile-flow.ts new file mode 100644 index 0000000..2143a3e --- /dev/null +++ b/src/app/@dataflow/rclone/operations-copyfile-flow.ts @@ -0,0 +1,37 @@ +import { IRcloneServer } from '../extra'; +import { CombErr } from '../core'; +import { AsyncPostFlow, AsyncPostFlowParamsNode } from './async-post-flow'; + +export interface OperationsCopyfileFlowParamsNode extends AsyncPostFlowParamsNode { + /** a remote name string eg “drive:” for the source */ + srcFs: string; + /** a path within that remote eg “file.txt” for the source */ + srcRemote: string; + /** a remote name string eg “drive2:” for the destination */ + dstFs: string; + /** a path within that remote eg “file2.txt” for the destination */ + dstRemote: string; +} +export interface OperationsCopyfileFlowInNode + extends OperationsCopyfileFlowParamsNode, + IRcloneServer {} + +export abstract class OperationsCopyfileFlow extends AsyncPostFlow< + OperationsCopyfileFlowInNode, + OperationsCopyfileFlowParamsNode +> { + // public prerequest$: Observable>; + protected cmd: string = 'operations/copyfile'; + protected params = ( + pre: CombErr + ): OperationsCopyfileFlowParamsNode => { + if (pre[1].length !== 0) return {} as any; + return { + srcFs: pre[0].srcFs, + srcRemote: pre[0].srcRemote, + dstFs: pre[0].dstFs, + dstRemote: pre[0].dstRemote, + }; + }; + protected cacheSupport: boolean = false; +} diff --git a/src/app/pages/manager/clipboard/clipboard.service.ts b/src/app/pages/manager/clipboard/clipboard.service.ts index 063d81f..1854b4d 100644 --- a/src/app/pages/manager/clipboard/clipboard.service.ts +++ b/src/app/pages/manager/clipboard/clipboard.service.ts @@ -25,7 +25,13 @@ export class Clipboard { dst?: NavigationFlowOutNode ) { const key = Clipboard.genKey(remote, row.Path); - this.data.set(key, { oper: o, key: key, srcItem: { ...row }, srcRemote: remote, dst: dst }); + this.data.set(key, { + oper: o, + key: key, + srcItem: { ...row }, + srcRemote: remote, + dst: { ...dst }, + }); } public pop(remote: string, path: string): ClipboardItem { @@ -52,6 +58,14 @@ export class Clipboard { return Array.from(this.data.values()); } + public get size(): number { + return this.data.size; + } + + public clear() { + this.data.clear(); + } + public static genKey(remote: string, path: string) { return JSON.stringify({ r: remote, p: path }); } diff --git a/src/app/pages/manager/manager.component.ts b/src/app/pages/manager/manager.component.ts index 5e81e13..3ed752f 100644 --- a/src/app/pages/manager/manager.component.ts +++ b/src/app/pages/manager/manager.component.ts @@ -5,11 +5,16 @@ import { CombErr } from 'src/app/@dataflow/core'; import { map, withLatestFrom } from 'rxjs/operators'; import { HomeModeComponent } from './homeMode/homeMode.component'; import { NbDialogService } from '@nebular/theme'; -import { OperationsMkdirFlow, OperationsMkdirFlowInNode } from 'src/app/@dataflow/rclone'; +import { + OperationsMkdirFlow, + OperationsMkdirFlowInNode, + OperationsCopyfileFlow, +} from 'src/app/@dataflow/rclone'; import { ConnectionService } from '../connection.service'; import { NbToastrService } from '@nebular/theme'; import { FileModeComponent } from './fileMode/fileMode.component'; import { ClipboardService } from './clipboard/clipboard.service'; +import { TaskService } from './task.service'; @Component({ selector: 'app-manager', @@ -35,7 +40,7 @@ import { ClipboardService } from './clipboard/clipboard.service'; - + @@ -138,7 +143,8 @@ export class ManagerComponent implements OnInit { private dialogService: NbDialogService, private connectService: ConnectionService, private toastrService: NbToastrService, - private clipboard: ClipboardService + private clipboard: ClipboardService, + private taskService: TaskService ) {} homeMode = false; fileMode = false; @@ -223,10 +229,23 @@ export class ManagerComponent implements OnInit { }); } + private pasteTrigger = new Subject(); + private pasteDeploy() { + this.pasteTrigger.pipe(withLatestFrom(this.nav$.getOutput())).subscribe(([, dstNode]) => { + if (dstNode[1].length !== 0) throw Error("can't not get destination."); + this.taskService.createTask(dstNode[0]); + }); + } + + paste() { + this.pasteTrigger.next(1); + } + ngOnInit(): void { this.navDeploy(); this.mkdirDeploy(); this.clipboardDeploy(); + this.pasteDeploy(); } dialog(dialog: TemplateRef) { diff --git a/src/app/pages/manager/task.service.spec.ts b/src/app/pages/manager/task.service.spec.ts new file mode 100644 index 0000000..1fd1755 --- /dev/null +++ b/src/app/pages/manager/task.service.spec.ts @@ -0,0 +1,16 @@ +/* tslint:disable:no-unused-variable */ + +import { TestBed, async, inject } from '@angular/core/testing'; +import { TaskService } from './task.service'; + +describe('Service: Task', () => { + beforeEach(() => { + TestBed.configureTestingModule({ + providers: [TaskService], + }); + }); + + it('should ...', inject([TaskService], (service: TaskService) => { + expect(service).toBeTruthy(); + })); +}); diff --git a/src/app/pages/manager/task.service.ts b/src/app/pages/manager/task.service.ts new file mode 100644 index 0000000..96ac448 --- /dev/null +++ b/src/app/pages/manager/task.service.ts @@ -0,0 +1,104 @@ +import { Injectable } from '@angular/core'; +import { OperationsCopyfileFlow, OperationsCopyfileFlowInNode } from 'src/app/@dataflow/rclone'; +import { ClipboardService, Clipboard, ClipboardItem } from './clipboard/clipboard.service'; +import { Subject, from, Observable } from 'rxjs'; +import { + withLatestFrom, + takeWhile, + mergeMap, + share, + filter, + map, + zip, + debounceTime, + mapTo, +} from 'rxjs/operators'; +import { ConnectionService } from '../connection.service'; +import { CombErr, NothingFlow } from 'src/app/@dataflow/core'; +import { NavigationFlowOutNode } from 'src/app/@dataflow/extra'; + +@Injectable({ + providedIn: 'root', +}) +export class TaskService { + private createTrigger = new Subject(); + public createTask(dst: NavigationFlowOutNode) { + this.createTrigger.next(dst); + } + + private deployCreate() { + this.createTrigger + .pipe(withLatestFrom(this.cbService.clipboard$.getOutput())) + .subscribe(([dst, cbNode]) => { + if (cbNode[1].length !== 0) return; + cbNode[0].clipboard.values.forEach((x) => + this.tasksPool.add(x.oper, x.srcRemote, x.srcItem, dst) + ); + this.cbService.clear(); + this.cbService.commit(); + this.postTrigger.next(1); + }); + } + + private tasksPool = new Clipboard(); //TODO: rename Clipboard to RngClipboar, for avoiding name conffict + private tasksFailure = new Clipboard(); + + private postTrigger = new Subject(); + private post$: Observable; + public postConcurrentCount = 1; + private emptySlot = this.postConcurrentCount; + private deployPost() { + this.post$ = this.postTrigger.pipe( + takeWhile(() => this.emptySlot > 0 && this.tasksPool.size > 0), + mergeMap(() => { + const tasksAll = this.tasksPool.values; + const tasksPost = tasksAll.slice(0, Math.min(tasksAll.length, this.emptySlot)); + this.emptySlot = 0; + return from(tasksPost); + }), + share() + ); + } + + private copyFile$: OperationsCopyfileFlow; + private deployCopyFile() { + const outer = this; + const taskReal$ = outer.post$.pipe(filter((x) => x.oper === 'copy' && !x.srcItem.IsDir)); + this.copyFile$ = new (class extends OperationsCopyfileFlow { + public prerequest$ = taskReal$.pipe( + withLatestFrom(outer.connectService.listCmd$.verify(this.cmd)), + map( + ([item, cmdNode]): CombErr => { + if (cmdNode[1].length !== 0) return [{}, cmdNode[1]] as any; + return [ + { + ...cmdNode[0], + srcFs: `${item.srcRemote}:`, + srcRemote: item.srcItem.Path, + dstFs: `${item.dst.remote}:`, + dstRemote: [item.dst.path, item.srcItem.Name].join('/'), // TODO: windows path delimiter '\' ? + }, + [], + ]; + } + ) + ); + })(); + this.copyFile$.deploy(); + this.copyFile$ + .getOutput() + .pipe(zip(taskReal$)) + .subscribe(([x, y]) => { + this.tasksPool.pop(y.srcRemote, y.srcItem.Path); // TODO: replase as del rather pop, for perf + this.emptySlot++; + if (x[1].length !== 0) this.tasksFailure.add(y.oper, y.srcRemote, y.srcItem, y.dst); + this.postTrigger.next(1); + }); + } + + constructor(private cbService: ClipboardService, private connectService: ConnectionService) { + this.deployCreate(); + this.deployPost(); + this.deployCopyFile(); + } +}