Skip to content

Commit

Permalink
feat(tasks-pool): a daemon service for posting tasks to rclone server
Browse files Browse the repository at this point in the history
  • Loading branch information
ElonH committed Jun 1, 2020
1 parent b61d8c6 commit c534ff1
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 4 deletions.
35 changes: 35 additions & 0 deletions src/app/@dataflow/rclone/async-post-flow.ts
@@ -0,0 +1,35 @@
import { AjaxRequest } from 'rxjs/ajax';
import { FlowSupNode, CombErr, AjaxFlowInteralNode } from '../core';
import { IRcloneServer } from '../extra';
import { PostFlow } from './post-flow';

export interface AsyncPostFlowParamsNode {
group?: string;
}
export interface AsyncPostFlowInNode extends IRcloneServer, AsyncPostFlowParamsNode {}

export interface AsyncPostFlowOutNode {
jobid: number;
}

export abstract class AsyncPostFlow<
Tin extends AsyncPostFlowInNode,
Tparms extends AsyncPostFlowParamsNode = AsyncPostFlowParamsNode,
Tsup extends FlowSupNode = Tin & AsyncPostFlowOutNode
> extends PostFlow<Tin, AsyncPostFlowOutNode, Tparms, Tsup> {
// protected cmd: string;
// protected params: Tparms | ((pre: CombErr<Tin>) => Tparms);
// protected cacheSupport: boolean;
// public prerequest$: Observable<CombErr<Tin>>;

protected requestAjax(x: CombErr<Tin>): AjaxRequest {
const res = super.requestAjax(x);
res.body._async = true;
return res;
}
protected reconstructAjaxResult(x: AjaxFlowInteralNode): CombErr<AsyncPostFlowOutNode> {
if (x[1].length !== 0) return [{}, x[1]] as any;
const rsp = x[0].ajaxRsp.response;
return [{ jobid: rsp['jobid'] }, []];
}
}
2 changes: 2 additions & 0 deletions src/app/@dataflow/rclone/index.ts
Expand Up @@ -7,3 +7,5 @@ export * from './connection-flow';
export * from './list-cmd-flow';
export * from './list-group-flow';
export * from './operations-mkdir-flow';
export * from './async-post-flow';
export * from './operations-copyfile-flow';
37 changes: 37 additions & 0 deletions src/app/@dataflow/rclone/operations-copyfile-flow.ts
@@ -0,0 +1,37 @@
import { IRcloneServer } from '../extra';
import { CombErr } from '../core';
import { AsyncPostFlow, AsyncPostFlowParamsNode } from './async-post-flow';

export interface OperationsCopyfileFlowParamsNode extends AsyncPostFlowParamsNode {
/** a remote name string eg “drive:” for the source */
srcFs: string;
/** a path within that remote eg “file.txt” for the source */
srcRemote: string;
/** a remote name string eg “drive2:” for the destination */
dstFs: string;
/** a path within that remote eg “file2.txt” for the destination */
dstRemote: string;
}
export interface OperationsCopyfileFlowInNode
extends OperationsCopyfileFlowParamsNode,
IRcloneServer {}

export abstract class OperationsCopyfileFlow extends AsyncPostFlow<
OperationsCopyfileFlowInNode,
OperationsCopyfileFlowParamsNode
> {
// public prerequest$: Observable<CombErr<OperationsCopyfileFlowInNode>>;
protected cmd: string = 'operations/copyfile';
protected params = (
pre: CombErr<OperationsCopyfileFlowInNode>
): OperationsCopyfileFlowParamsNode => {
if (pre[1].length !== 0) return {} as any;
return {
srcFs: pre[0].srcFs,
srcRemote: pre[0].srcRemote,
dstFs: pre[0].dstFs,
dstRemote: pre[0].dstRemote,
};
};
protected cacheSupport: boolean = false;
}
16 changes: 15 additions & 1 deletion src/app/pages/manager/clipboard/clipboard.service.ts
Expand Up @@ -25,7 +25,13 @@ export class Clipboard {
dst?: NavigationFlowOutNode
) {
const key = Clipboard.genKey(remote, row.Path);
this.data.set(key, { oper: o, key: key, srcItem: { ...row }, srcRemote: remote, dst: dst });
this.data.set(key, {
oper: o,
key: key,
srcItem: { ...row },
srcRemote: remote,
dst: { ...dst },
});
}

public pop(remote: string, path: string): ClipboardItem {
Expand All @@ -52,6 +58,14 @@ export class Clipboard {
return Array.from(this.data.values());
}

public get size(): number {
return this.data.size;
}

public clear() {
this.data.clear();
}

public static genKey(remote: string, path: string) {
return JSON.stringify({ r: remote, p: path });
}
Expand Down
25 changes: 22 additions & 3 deletions src/app/pages/manager/manager.component.ts
Expand Up @@ -5,11 +5,16 @@ import { CombErr } from 'src/app/@dataflow/core';
import { map, withLatestFrom } from 'rxjs/operators';
import { HomeModeComponent } from './homeMode/homeMode.component';
import { NbDialogService } from '@nebular/theme';
import { OperationsMkdirFlow, OperationsMkdirFlowInNode } from 'src/app/@dataflow/rclone';
import {
OperationsMkdirFlow,
OperationsMkdirFlowInNode,
OperationsCopyfileFlow,
} from 'src/app/@dataflow/rclone';
import { ConnectionService } from '../connection.service';
import { NbToastrService } from '@nebular/theme';
import { FileModeComponent } from './fileMode/fileMode.component';
import { ClipboardService } from './clipboard/clipboard.service';
import { TaskService } from './task.service';

@Component({
selector: 'app-manager',
Expand All @@ -35,7 +40,7 @@ import { ClipboardService } from './clipboard/clipboard.service';
<nb-action icon="copy" (click)="file.manipulate('copy')"></nb-action>
<nb-action icon="move" (click)="file.manipulate('move')"></nb-action>
<nb-action icon="trash-2" (click)="file.manipulate('del')"></nb-action>
<nb-action icon="clipboard"></nb-action>
<nb-action icon="clipboard" (click)="paste()"></nb-action>
</nb-actions>
<nb-actions *ngIf="fileMode">
<ng-template #mkdirDialog let-ref="dialogRef">
Expand Down Expand Up @@ -138,7 +143,8 @@ export class ManagerComponent implements OnInit {
private dialogService: NbDialogService,
private connectService: ConnectionService,
private toastrService: NbToastrService,
private clipboard: ClipboardService
private clipboard: ClipboardService,
private taskService: TaskService
) {}
homeMode = false;
fileMode = false;
Expand Down Expand Up @@ -223,10 +229,23 @@ export class ManagerComponent implements OnInit {
});
}

private pasteTrigger = new Subject<number>();
private pasteDeploy() {
this.pasteTrigger.pipe(withLatestFrom(this.nav$.getOutput())).subscribe(([, dstNode]) => {
if (dstNode[1].length !== 0) throw Error("can't not get destination.");
this.taskService.createTask(dstNode[0]);
});
}

paste() {
this.pasteTrigger.next(1);
}

ngOnInit(): void {
this.navDeploy();
this.mkdirDeploy();
this.clipboardDeploy();
this.pasteDeploy();
}

dialog(dialog: TemplateRef<any>) {
Expand Down
16 changes: 16 additions & 0 deletions src/app/pages/manager/task.service.spec.ts
@@ -0,0 +1,16 @@
/* tslint:disable:no-unused-variable */

import { TestBed, async, inject } from '@angular/core/testing';
import { TaskService } from './task.service';

describe('Service: Task', () => {
beforeEach(() => {
TestBed.configureTestingModule({
providers: [TaskService],
});
});

it('should ...', inject([TaskService], (service: TaskService) => {
expect(service).toBeTruthy();
}));
});
104 changes: 104 additions & 0 deletions src/app/pages/manager/task.service.ts
@@ -0,0 +1,104 @@
import { Injectable } from '@angular/core';
import { OperationsCopyfileFlow, OperationsCopyfileFlowInNode } from 'src/app/@dataflow/rclone';
import { ClipboardService, Clipboard, ClipboardItem } from './clipboard/clipboard.service';
import { Subject, from, Observable } from 'rxjs';
import {
withLatestFrom,
takeWhile,
mergeMap,
share,
filter,
map,
zip,
debounceTime,
mapTo,
} from 'rxjs/operators';
import { ConnectionService } from '../connection.service';
import { CombErr, NothingFlow } from 'src/app/@dataflow/core';
import { NavigationFlowOutNode } from 'src/app/@dataflow/extra';

@Injectable({
providedIn: 'root',
})
export class TaskService {
private createTrigger = new Subject<NavigationFlowOutNode>();
public createTask(dst: NavigationFlowOutNode) {
this.createTrigger.next(dst);
}

private deployCreate() {
this.createTrigger
.pipe(withLatestFrom(this.cbService.clipboard$.getOutput()))
.subscribe(([dst, cbNode]) => {
if (cbNode[1].length !== 0) return;
cbNode[0].clipboard.values.forEach((x) =>
this.tasksPool.add(x.oper, x.srcRemote, x.srcItem, dst)
);
this.cbService.clear();
this.cbService.commit();
this.postTrigger.next(1);
});
}

private tasksPool = new Clipboard(); //TODO: rename Clipboard to RngClipboar, for avoiding name conffict
private tasksFailure = new Clipboard();

private postTrigger = new Subject<number>();
private post$: Observable<ClipboardItem>;
public postConcurrentCount = 1;
private emptySlot = this.postConcurrentCount;
private deployPost() {
this.post$ = this.postTrigger.pipe(
takeWhile(() => this.emptySlot > 0 && this.tasksPool.size > 0),
mergeMap(() => {
const tasksAll = this.tasksPool.values;
const tasksPost = tasksAll.slice(0, Math.min(tasksAll.length, this.emptySlot));
this.emptySlot = 0;
return from(tasksPost);
}),
share()
);
}

private copyFile$: OperationsCopyfileFlow;
private deployCopyFile() {
const outer = this;
const taskReal$ = outer.post$.pipe(filter((x) => x.oper === 'copy' && !x.srcItem.IsDir));
this.copyFile$ = new (class extends OperationsCopyfileFlow {
public prerequest$ = taskReal$.pipe(
withLatestFrom(outer.connectService.listCmd$.verify(this.cmd)),
map(
([item, cmdNode]): CombErr<OperationsCopyfileFlowInNode> => {
if (cmdNode[1].length !== 0) return [{}, cmdNode[1]] as any;
return [
{
...cmdNode[0],
srcFs: `${item.srcRemote}:`,
srcRemote: item.srcItem.Path,
dstFs: `${item.dst.remote}:`,
dstRemote: [item.dst.path, item.srcItem.Name].join('/'), // TODO: windows path delimiter '\' ?
},
[],
];
}
)
);
})();
this.copyFile$.deploy();
this.copyFile$
.getOutput()
.pipe(zip(taskReal$))
.subscribe(([x, y]) => {
this.tasksPool.pop(y.srcRemote, y.srcItem.Path); // TODO: replase as del rather pop, for perf
this.emptySlot++;
if (x[1].length !== 0) this.tasksFailure.add(y.oper, y.srcRemote, y.srcItem, y.dst);
this.postTrigger.next(1);
});
}

constructor(private cbService: ClipboardService, private connectService: ConnectionService) {
this.deployCreate();
this.deployPost();
this.deployCopyFile();
}
}

0 comments on commit c534ff1

Please sign in to comment.