diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/model/websocket/event/OperatorStatisticsUpdateEvent.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/model/websocket/event/OperatorStatisticsUpdateEvent.scala index 3e66da4b946..b33b6f93e15 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/model/websocket/event/OperatorStatisticsUpdateEvent.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/model/websocket/event/OperatorStatisticsUpdateEvent.scala @@ -23,8 +23,10 @@ case class OperatorAggregatedMetrics( operatorState: String, aggregatedInputRowCount: Long, aggregatedInputSize: Long, + inputPortMetrics: Map[String, Long], aggregatedOutputRowCount: Long, aggregatedOutputSize: Long, + outputPortMetrics: Map[String, Long], numWorkers: Long, aggregatedDataProcessingTime: Long, aggregatedControlProcessingTime: Long, diff --git a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala index 05523ea2afb..6aabe7c4eb5 100644 --- a/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala +++ b/core/amber/src/main/scala/edu/uci/ics/texera/web/service/ExecutionStatsService.scala @@ -107,12 +107,21 @@ class ExecutionStatsService( OperatorStatisticsUpdateEvent(newState.operatorInfo.collect { case x => val metrics = x._2 + val inMap = metrics.operatorStatistics.inputMetrics + .map(pm => pm.portId.id.toString -> pm.tupleMetrics.count) + .toMap + val outMap = metrics.operatorStatistics.outputMetrics + .map(pm => pm.portId.id.toString -> pm.tupleMetrics.count) + .toMap + val res = OperatorAggregatedMetrics( Utils.aggregatedStateToString(metrics.operatorState), metrics.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum, metrics.operatorStatistics.inputMetrics.map(_.tupleMetrics.size).sum, + inMap, metrics.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum, metrics.operatorStatistics.outputMetrics.map(_.tupleMetrics.size).sum, + outMap, metrics.operatorStatistics.numWorkers, metrics.operatorStatistics.dataProcessingTime, metrics.operatorStatistics.controlProcessingTime, diff --git a/core/gui/src/app/workspace/component/code-editor-dialog/code-debugger.component.spec.ts b/core/gui/src/app/workspace/component/code-editor-dialog/code-debugger.component.spec.ts index 72d73f55de6..ec4e5a22dd4 100644 --- a/core/gui/src/app/workspace/component/code-editor-dialog/code-debugger.component.spec.ts +++ b/core/gui/src/app/workspace/component/code-editor-dialog/code-debugger.component.spec.ts @@ -98,7 +98,13 @@ describe("CodeDebuggerComponent", () => { // Emit a Running state event statusUpdateStream.next({ - [operatorId]: { operatorState: OperatorState.Running, aggregatedOutputRowCount: 0, aggregatedInputRowCount: 0 }, + [operatorId]: { + operatorState: OperatorState.Running, + aggregatedOutputRowCount: 0, + aggregatedInputRowCount: 0, + inputPortMetrics: {}, + outputPortMetrics: {}, + }, }); tick(); @@ -109,7 +115,13 @@ describe("CodeDebuggerComponent", () => { // Emit the same state again (should not trigger setup again) statusUpdateStream.next({ - [operatorId]: { operatorState: OperatorState.Running, aggregatedOutputRowCount: 0, aggregatedInputRowCount: 0 }, + [operatorId]: { + operatorState: OperatorState.Running, + aggregatedOutputRowCount: 0, + aggregatedInputRowCount: 0, + inputPortMetrics: {}, + outputPortMetrics: {}, + }, }); tick(); @@ -120,7 +132,13 @@ describe("CodeDebuggerComponent", () => { // Emit the paused state (should not trigger setup) statusUpdateStream.next({ - [operatorId]: { operatorState: OperatorState.Paused, aggregatedOutputRowCount: 0, aggregatedInputRowCount: 0 }, + [operatorId]: { + operatorState: OperatorState.Paused, + aggregatedOutputRowCount: 0, + aggregatedInputRowCount: 0, + inputPortMetrics: {}, + outputPortMetrics: {}, + }, }); tick(); @@ -131,7 +149,13 @@ describe("CodeDebuggerComponent", () => { // Emit the running state once more (should not trigger setup) statusUpdateStream.next({ - [operatorId]: { operatorState: OperatorState.Paused, aggregatedOutputRowCount: 0, aggregatedInputRowCount: 0 }, + [operatorId]: { + operatorState: OperatorState.Paused, + aggregatedOutputRowCount: 0, + aggregatedInputRowCount: 0, + inputPortMetrics: {}, + outputPortMetrics: {}, + }, }); tick(); @@ -150,6 +174,8 @@ describe("CodeDebuggerComponent", () => { operatorState: OperatorState.Uninitialized, aggregatedOutputRowCount: 0, aggregatedInputRowCount: 0, + inputPortMetrics: {}, + outputPortMetrics: {}, }, }); @@ -163,6 +189,8 @@ describe("CodeDebuggerComponent", () => { operatorState: OperatorState.Uninitialized, aggregatedOutputRowCount: 0, aggregatedInputRowCount: 0, + inputPortMetrics: {}, + outputPortMetrics: {}, }, }); diff --git a/core/gui/src/app/workspace/service/joint-ui/joint-ui.service.ts b/core/gui/src/app/workspace/service/joint-ui/joint-ui.service.ts index c15b7e88aa5..e7a4c9dd824 100644 --- a/core/gui/src/app/workspace/service/joint-ui/joint-ui.service.ts +++ b/core/gui/src/app/workspace/service/joint-ui/joint-ui.service.ts @@ -106,13 +106,13 @@ export const operatorViewResultIconClass = "texera-operator-view-result-icon"; export const operatorStateClass = "texera-operator-state"; export const operatorProcessedCountClass = "texera-operator-processed-count"; export const operatorOutputCountClass = "texera-operator-output-count"; -export const operatorAbbreviatedCountClass = "texera-operator-abbreviated-count"; export const operatorCoeditorEditingClass = "texera-operator-coeditor-editing"; export const operatorCoeditorChangedPropertyClass = "texera-operator-coeditor-changed-property"; export const operatorIconClass = "texera-operator-icon"; export const operatorNameClass = "texera-operator-name"; export const operatorFriendlyNameClass = "texera-operator-friendly-name"; +export const operatorPortMetricsClass = "texera-operator-port-metrics"; export const linkPathStrokeColor = "#919191"; @@ -129,9 +129,9 @@ class TexeraCustomJointElement extends joint.shapes.devs.Model { + - @@ -247,6 +247,7 @@ export class JointUIService { // set operator element ID to be operator ID operatorElement.set("id", operator.operatorID); + operatorElement.set("z", 1); // set the input ports and output ports based on operator predicate operator.inputPorts.forEach(port => @@ -288,7 +289,6 @@ export class JointUIService { jointPaper.getModelById(operatorID).attr({ [`.${operatorProcessedCountClass}`]: { text: "" }, [`.${operatorOutputCountClass}`]: { text: "" }, - [`.${operatorAbbreviatedCountClass}`]: { text: "" }, }); this.changeOperatorState(jointPaper, operatorID, OperatorState.Uninitialized); return; @@ -301,38 +301,96 @@ export class JointUIService { const processedCountText = isSource ? "" : abbreviateNumber(statistics.aggregatedInputRowCount); const outputCountText = isSink ? "" : abbreviateNumber(statistics.aggregatedOutputRowCount); const abbreviatedText = processedCountText + (isSource || isSink ? "" : " → ") + outputCountText; + + const element = jointPaper.getModelById(operatorID) as joint.shapes.devs.Model; + const allPorts = element.getPorts(); + const inPorts = allPorts.filter(p => p.group === "in"); + const outPorts = allPorts.filter(p => p.group === "out"); + + const inputMetrics = statistics.inputPortMetrics; + const outputMetrics = statistics.outputPortMetrics; + + inPorts.forEach(portDef => { + const portId = portDef.id; + if (portId != null) { + const parts = portId.split("-"); + const numericSuffix = parts.length > 1 ? parts[1] : portId; + + const count: number = inputMetrics[numericSuffix] ?? 0; + const rawAttrs = (portDef.attrs as any) || {}; + const oldText: string = (rawAttrs[".port-label"] && rawAttrs[".port-label"].text) || ""; + let originalName = oldText.includes(":") ? oldText.split(":", 1)[0].trim() : oldText; + + if (!originalName) { + originalName = portId; + } + + const labelText = `${originalName}: ${count}`; + + element.portProp(portId, "attrs/.port-label/text", labelText); + } + }); + + outPorts.forEach(portDef => { + const portId = portDef.id; + if (portId != null) { + const parts = portId.split("-"); + const numericSuffix = parts.length > 1 ? parts[1] : portId; + + const count: number = outputMetrics[numericSuffix] ?? 0; + const rawAttrs = (portDef.attrs as any) || {}; + const oldText: string = (rawAttrs[".port-label"] && rawAttrs[".port-label"].text) || ""; + let originalName = oldText.includes(":") ? oldText.split(":", 1)[0].trim() : oldText; + + if (!originalName) { + originalName = portId; + } + + const labelText = `${originalName}: ${count}`; + + element.portProp(portId, "attrs/.port-label/text", labelText); + } + }); + jointPaper.getModelById(operatorID).attr({ [`.${operatorProcessedCountClass}`]: isSink ? { text: processedText, "ref-y": -30 } : { text: processedText }, [`.${operatorOutputCountClass}`]: { text: outputText }, - [`.${operatorAbbreviatedCountClass}`]: { text: abbreviatedText }, }); + + this.changeOperatorState(jointPaper, operatorID, statistics.operatorState); } public foldOperatorDetails(jointPaper: joint.dia.Paper, operatorID: string): void { jointPaper.getModelById(operatorID).attr({ - [`.${operatorAbbreviatedCountClass}`]: { visibility: "visible" }, [`.${operatorProcessedCountClass}`]: { visibility: "hidden" }, [`.${operatorOutputCountClass}`]: { visibility: "hidden" }, [`.${operatorStateClass}`]: { visibility: "hidden" }, + [`.${operatorPortMetricsClass}`]: { visibility: "hidden" }, ".delete-button": { visibility: "hidden" }, ".add-input-port-button": { visibility: "hidden" }, ".add-output-port-button": { visibility: "hidden" }, ".remove-input-port-button": { visibility: "hidden" }, ".remove-output-port-button": { visibility: "hidden" }, }); + const element = jointPaper.getModelById(operatorID) as joint.shapes.devs.Model; } public unfoldOperatorDetails(jointPaper: joint.dia.Paper, operatorID: string): void { jointPaper.getModelById(operatorID).attr({ - [`.${operatorAbbreviatedCountClass}`]: { visibility: "hidden" }, [`.${operatorProcessedCountClass}`]: { visibility: "visible" }, [`.${operatorOutputCountClass}`]: { visibility: "visible" }, [`.${operatorStateClass}`]: { visibility: "visible" }, + [`.${operatorPortMetricsClass}`]: { visibility: "visible" }, ".delete-button": { visibility: "visible" }, ".add-input-port-button": { visibility: "visible" }, ".add-output-port-button": { visibility: "visible" }, ".remove-input-port-button": { visibility: "visible" }, ".remove-output-port-button": { visibility: "visible" }, }); + + const element = jointPaper.getModelById(operatorID) as joint.shapes.devs.Model; + if (!element) { + return; + } } public changeOperatorState(jointPaper: joint.dia.Paper, operatorID: string, operatorState: OperatorState): void { @@ -359,9 +417,24 @@ export class JointUIService { [`.${operatorStateClass}`]: { text: operatorState.toString() }, [`.${operatorStateClass}`]: { fill: fillColor }, "rect.body": { stroke: fillColor }, - [`.${operatorAbbreviatedCountClass}`]: { fill: fillColor }, [`.${operatorProcessedCountClass}`]: { fill: fillColor }, [`.${operatorOutputCountClass}`]: { fill: fillColor }, + [`.${operatorPortMetricsClass}`]: { fill: fillColor }, + }); + const element = jointPaper.getModelById(operatorID) as joint.shapes.devs.Model; + const allPorts = element.getPorts(); + const inPorts = allPorts.filter(p => p.group === "in"); + inPorts.forEach(p => { + if (p.id != null) { + element.portProp(p.id, "attrs/.port-label/fill", fillColor); + } + }); + + const outPorts = allPorts.filter(p => p.group === "out"); + outPorts.forEach(p => { + if (p.id != null) { + element.portProp(p.id, "attrs/.port-label/fill", fillColor); + } }); } @@ -488,6 +561,7 @@ export class JointUIService { port: link.target.portID, }); jointLinkCell.set("id", link.linkID); + jointLinkCell.set("z", 0); return jointLinkCell; } @@ -578,9 +652,13 @@ export class JointUIService { stroke: "none", }, ".port-label": { + visibility: "visible", event: "input-label:evt", dblclick: "input-label:dbclick", pointerdblclick: "input-label:pointerdblclick", + ref: ".port-body", + "ref-y": 0.5, + "y-alignment": "middle", }, }; } @@ -680,6 +758,17 @@ export class JointUIService { "y-alignment": "middle", "x-alignment": "middle", }, + ".texera-operator-port-metrics": { + text: "", + fill: "green", + "font-size": "14px", + visibility: "hidden", + "ref-x": 0.5, + "ref-y": -70, + ref: "rect.body", + "y-alignment": "middle", + "x-alignment": "middle", + }, ".texera-operator-processed-count": { text: "", fill: "green", diff --git a/core/gui/src/app/workspace/service/operator-debug/udf-debug.service.spec.ts b/core/gui/src/app/workspace/service/operator-debug/udf-debug.service.spec.ts index 342aef39702..92a9a53848d 100644 --- a/core/gui/src/app/workspace/service/operator-debug/udf-debug.service.spec.ts +++ b/core/gui/src/app/workspace/service/operator-debug/udf-debug.service.spec.ts @@ -200,6 +200,8 @@ describe("UdfDebugServiceSpec", () => { operatorState: OperatorState.Uninitialized, aggregatedInputRowCount: 0, aggregatedOutputRowCount: 0, + inputPortMetrics: {}, + outputPortMetrics: {}, }, }); diff --git a/core/gui/src/app/workspace/service/workflow-status/workflow-status.service.ts b/core/gui/src/app/workspace/service/workflow-status/workflow-status.service.ts index 8e483a57d39..e939932aeba 100644 --- a/core/gui/src/app/workspace/service/workflow-status/workflow-status.service.ts +++ b/core/gui/src/app/workspace/service/workflow-status/workflow-status.service.ts @@ -55,7 +55,9 @@ export class WorkflowStatusService { accumulator[operatorId] = { operatorState: OperatorState.Uninitialized, aggregatedInputRowCount: 0, + inputPortMetrics: {}, aggregatedOutputRowCount: 0, + outputPortMetrics: {}, }; return accumulator; }, diff --git a/core/gui/src/app/workspace/types/execute-workflow.interface.ts b/core/gui/src/app/workspace/types/execute-workflow.interface.ts index 02f66701d0c..bf6482d06ae 100644 --- a/core/gui/src/app/workspace/types/execute-workflow.interface.ts +++ b/core/gui/src/app/workspace/types/execute-workflow.interface.ts @@ -81,7 +81,9 @@ export interface OperatorStatistics extends Readonly<{ operatorState: OperatorState; aggregatedInputRowCount: number; + inputPortMetrics: Record; aggregatedOutputRowCount: number; + outputPortMetrics: Record; }> {} export interface OperatorStatsUpdate