Skip to content

Commit

Permalink
feat: add demo
Browse files Browse the repository at this point in the history
  • Loading branch information
everbrez committed Jan 14, 2024
1 parent cd6f49a commit bc1b375
Show file tree
Hide file tree
Showing 13 changed files with 1,003 additions and 22 deletions.
9 changes: 8 additions & 1 deletion packages/core/src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
// 之后可以区分为操作流和数据流

// ================ streams operators ================= //
export { constValue, sum, proxyData, combine, transform } from './streams';
export {
constValue,
sum,
proxyData,
combine,
transform,
merge,
} from './streams';

// ================ values operators ================= //
export { sumValue } from './values';
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/operators/streams/combine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export function combine<
) {
const result = new ModelState<any[]>([]);

streamSource.forEach((input, index) => {
streamSource.forEach((input) => {
input.subscribe((_value, extraInfo) => {
const allValue = [...streamSource, ...appendSource].map(
(item) => item.current,
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/operators/streams/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export { proxyData } from './proxyData';
export { sum } from './sum';
export { combine } from './combine';
export { transform } from './transform';
export { merge } from './merge';
16 changes: 16 additions & 0 deletions packages/core/src/operators/streams/merge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { type Atom } from '../../ModelState/State';
import { ModelState } from '../..';

export function merge<ObservableSubjectList extends Array<Atom<any>>>(
streamSource: ObservableSubjectList,
) {
const result = new ModelState<any[]>([]);

streamSource.forEach((input) => {
input.subscribe((value, extraInfo) => {
result.update(value, extraInfo.concat('combine'));
});
});

return result;
}
9 changes: 7 additions & 2 deletions packages/core/src/operators/streams/transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,13 @@ export function transform<StreamValue, ResultValue>(
return steam.pipe((observer) => {
const result = new ModelState<ResultValue | undefined>(undefined);

observer.subscribe((value, extra) => {
result.update(action(value), extra.concat('transform'));
observer.subscribe(async (value, extra) => {
const actionResult = action(value);
let resultValue = actionResult;
if (actionResult instanceof Promise) {
resultValue = await actionResult;
}
result.update(resultValue, extra.concat('transform'));
});

return result;
Expand Down
9 changes: 8 additions & 1 deletion packages/flow/src/Diagrams/Compiler/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,14 @@ export class NodeGraph {
for (const edge of this.edges) {
if (edge.source && edge.target) {
const currentDegree = degreeMap.get(edge.target) || 0;
degreeMap.set(edge.target, currentDegree + 1);
const targetNode = this.nodeMap.get(edge.target);
const ignoreDegreeIds = targetNode
? getOperatorFromNode(targetNode)?.getIgnoreDegreeIds?.(targetNode) ||
[]
: [];
if (!ignoreDegreeIds.includes(edge.targetHandle || '')) {
degreeMap.set(edge.target, currentDegree + 1);
}
}
}

Expand Down
35 changes: 21 additions & 14 deletions packages/flow/src/Diagrams/Operators/CombineOperator/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,33 @@ export class CombineOperator
this.getHintPorts(node, 'output')?.find((item) => item.hint === 'output')
?.id || '';

const mainInputSet = new Set(
this.getMainInputPorts(node)?.map((item) => item.id),
);
const appendInputSet = new Set(
this.getAppendInputPorts(node)?.map((item) => item.id),
);
const mainInputList =
this.getMainInputPorts(node)?.map((item) => item.id) || [];

const appendInputList =
this.getAppendInputPorts(node)?.map((item) => item.id) || [];

const sourceItems = nodeGraph.findSourceNodes(node.id) || [];
const mainInputSourceIds = sourceItems
.filter((item) => mainInputSet.has(item.handleId))
.map((item) => item.relatedHandleId);
const sourceItemMap = new Map(
sourceItems.map((item) => [item.handleId, item]),
);

const mainInputSourceIds = mainInputList
.map((id) => sourceItemMap.get(id))
.map((item) => item?.relatedHandleId);

const appendInputSourceIds = sourceItems
.filter((item) => appendInputSet.has(item.handleId))
.map((item) => item.relatedHandleId);
const appendInputSourceIds = appendInputList
.map((id) => sourceItemMap.get(id))
.map((item) => item?.relatedHandleId);

return [
`const ${formatVariableName(handleId)} = ${EosOperatorsSymbol}.combine(
[${mainInputSourceIds.map((id) => formatVariableName(id)).join(',')}],
[${appendInputSourceIds.map((id) => formatVariableName(id)).join(',')}]
[${mainInputSourceIds
.map((id) => (id ? formatVariableName(id) : 'undefined'))
.join(',')}],
[${appendInputSourceIds
.map((id) => (id ? formatVariableName(id) : 'undefined'))
.join(',')}]
)`,
];
}
Expand Down
141 changes: 141 additions & 0 deletions packages/flow/src/Diagrams/Operators/MergeOperator/index.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { type Node } from 'reactflow';
import { NodeTypeEnum } from '../../Nodes/NodeTypeEnum';
import { MetaOperator } from '../Operator';
import { EndPoint, type IMergeOperatorData } from '../types';
import { type IGenerationOption } from '../../Compiler/graph';
import { EosCoreSymbol, EosOperatorsSymbol } from '../../Compiler/runtime';

export class MergeOperator
extends MetaOperator<IMergeOperatorData>
implements MetaOperator<IMergeOperatorData>
{
nodeColor?: string | undefined = '#C21292';

constructor() {
super({
operatorName: 'Merge',
operatorType: 'MergeOperator',
nodeType: NodeTypeEnum.Node,
});
}

create(): Node<IMergeOperatorData<Record<string, any>>> {
return super.create({
endPointOptions: {
endPointList: [
new EndPoint({
type: 'group',
hint: 'output',
children: [
new EndPoint({
label: 'output',
hint: 'output',
variableType: 'array',
type: 'source',
}),
],
}),
new EndPoint({
type: 'group',
hint: 'input',
label: 'Stream',
defaultChildData: {
hint: 'input',
type: 'target',
ignoreDegree: true,
},
allowAddAndRemoveChildren: true,
children: [
new EndPoint({
hint: 'input',
type: 'target',
ignoreDegree: true,
}),
],
}),
],
},
});
}

getHintPorts(node: Node<IMergeOperatorData>, hint: string) {
return node.data.endPointOptions?.endPointList?.find(
(item) => item.hint === hint,
)?.children;
}

getIgnoreDegreeIds(
node: Node<IMergeOperatorData<Record<string, any>>>,
): string[] {
const mainInputList =
this.getHintPorts(node, 'input')?.map((item) => item.id) || [];
return mainInputList;
}

generateBlockDeclarations(
options: IGenerationOption<IMergeOperatorData<Record<string, any>>>,
): string[] {
const { node, formatVariableName } = options;

const handleId =
this.getHintPorts(node, 'output')?.find((item) => item.hint === 'output')
?.id || '';

const mainInputList =
this.getHintPorts(node, 'input')?.map((item) => item.id) || [];
return [
...mainInputList.map(
(inputId) =>
`const ${formatVariableName(
inputId,
)} = new ${EosCoreSymbol}.ModelState(undefined)`,
),
`const ${formatVariableName(handleId)} = ${EosOperatorsSymbol}.merge(
[${mainInputList.map((id) => formatVariableName(id)).join(',')}],
)`,
];
}

generateBlockOutput(
options: IGenerationOption<IMergeOperatorData<Record<string, any>>>,
): string[] {
return [];
}

generateBlockRelation(
options: IGenerationOption<IMergeOperatorData<Record<string, any>>>,
): string[] {
const { node, formatVariableName, nodeGraph } = options;

const mainInputList =
this.getHintPorts(node, 'input')?.map((item) => item.id) || [];

const sourceItems = nodeGraph.findSourceNodes(node.id) || [];
const sourceItemMap = new Map(
sourceItems.map((item) => [item.handleId, item]),
);

const relationList = mainInputList
.map((inputId) => ({
inputId,
targetId: sourceItemMap.get(inputId)?.relatedHandleId || '',
}))
.filter((item) => item.inputId && item.targetId);

return [
...relationList.map(
({ inputId, targetId }) => `${formatVariableName(
targetId,
)}.subscribe((val, extraInfo) => {
${formatVariableName(
inputId,
)}.update(val, extraInfo.concat('${JSON.stringify({
currentNodeId: node.id,
fromPortId: targetId,
toPortId: inputId,
})}'));
});`,
),
];
}
}
10 changes: 7 additions & 3 deletions packages/flow/src/Diagrams/Operators/Operator.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,15 @@ export abstract class MetaOperator<
}

// ============ START: Code Generation ============= //
abstract generateBlockDeclarations(options: IGenerationOption): string[];
abstract generateBlockDeclarations(options: IGenerationOption<T>): string[];

abstract generateBlockRelation(options: IGenerationOption): string[];
abstract generateBlockRelation(options: IGenerationOption<T>): string[];

abstract generateBlockOutput(options: IGenerationOption): string[];
abstract generateBlockOutput(options: IGenerationOption<T>): string[];

getIgnoreDegreeIds(node: Node<T>): string[] {
return [];
}

// ============ START: Hooks handler ============= //
onAfterCreate(options: IHookOption<Node<T>>): void {}
Expand Down
2 changes: 2 additions & 0 deletions packages/flow/src/Diagrams/Operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { SumOperator } from './SumOperator';
import { CustomOperator } from './CustomOperator';
import { CombineOperator } from './CombineOperator';
import { TransformOperator, EffectOperator } from './TransformOperator';
import { MergeOperator } from './MergeOperator';

import { registerOperators } from './OperatorMap';

Expand All @@ -18,6 +19,7 @@ registerOperators([
new CombineOperator(),
new TransformOperator(),
new EffectOperator(),
new MergeOperator(),
]);

export {
Expand Down
7 changes: 7 additions & 0 deletions packages/flow/src/Diagrams/Operators/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ export class EndPoint {
allowAddAndRemoveChildren?: boolean;
variableType?: 'array' | 'object' | 'primary';
defaultChildData?: Partial<EndPoint>;
ignoreDegree?: boolean;

constructor(data: Partial<EndPoint>) {
Object.assign(this, data);
Expand Down Expand Up @@ -143,3 +144,9 @@ export interface IEffectOperatorData<
> extends ITranformOperatorData<NodeOptions> {
// noop
}

export interface IMergeOperatorData<
NodeOptions extends Record<string, any> = Record<string, any>,
> extends IMetaOperatorData<NodeOptions> {
// noop
}
Loading

0 comments on commit bc1b375

Please sign in to comment.