Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ case class OperatorAggregatedMetrics(
operatorState: String,
aggregatedInputRowCount: Long,
aggregatedInputSize: Long,
inputPortMetrics: Map[String, Long],
aggregatedOutputRowCount: Long,
aggregatedOutputSize: Long,
outputPortMetrics: Map[String, Long],
numWorkers: Long,
aggregatedDataProcessingTime: Long,
aggregatedControlProcessingTime: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,21 @@ class ExecutionStatsService(
OperatorStatisticsUpdateEvent(newState.operatorInfo.collect {
case x =>
val metrics = x._2
val inMap = metrics.operatorStatistics.inputMetrics
.map(pm => pm.portId.id.toString -> pm.tupleMetrics.count)
.toMap
val outMap = metrics.operatorStatistics.outputMetrics
.map(pm => pm.portId.id.toString -> pm.tupleMetrics.count)
.toMap

val res = OperatorAggregatedMetrics(
Utils.aggregatedStateToString(metrics.operatorState),
metrics.operatorStatistics.inputMetrics.map(_.tupleMetrics.count).sum,
metrics.operatorStatistics.inputMetrics.map(_.tupleMetrics.size).sum,
inMap,
metrics.operatorStatistics.outputMetrics.map(_.tupleMetrics.count).sum,
metrics.operatorStatistics.outputMetrics.map(_.tupleMetrics.size).sum,
outMap,
metrics.operatorStatistics.numWorkers,
metrics.operatorStatistics.dataProcessingTime,
metrics.operatorStatistics.controlProcessingTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,13 @@ describe("CodeDebuggerComponent", () => {

// Emit a Running state event
statusUpdateStream.next({
[operatorId]: { operatorState: OperatorState.Running, aggregatedOutputRowCount: 0, aggregatedInputRowCount: 0 },
[operatorId]: {
operatorState: OperatorState.Running,
aggregatedOutputRowCount: 0,
aggregatedInputRowCount: 0,
inputPortMetrics: {},
outputPortMetrics: {},
},
});

tick();
Expand All @@ -109,7 +115,13 @@ describe("CodeDebuggerComponent", () => {

// Emit the same state again (should not trigger setup again)
statusUpdateStream.next({
[operatorId]: { operatorState: OperatorState.Running, aggregatedOutputRowCount: 0, aggregatedInputRowCount: 0 },
[operatorId]: {
operatorState: OperatorState.Running,
aggregatedOutputRowCount: 0,
aggregatedInputRowCount: 0,
inputPortMetrics: {},
outputPortMetrics: {},
},
});

tick();
Expand All @@ -120,7 +132,13 @@ describe("CodeDebuggerComponent", () => {

// Emit the paused state (should not trigger setup)
statusUpdateStream.next({
[operatorId]: { operatorState: OperatorState.Paused, aggregatedOutputRowCount: 0, aggregatedInputRowCount: 0 },
[operatorId]: {
operatorState: OperatorState.Paused,
aggregatedOutputRowCount: 0,
aggregatedInputRowCount: 0,
inputPortMetrics: {},
outputPortMetrics: {},
},
});

tick();
Expand All @@ -131,7 +149,13 @@ describe("CodeDebuggerComponent", () => {

// Emit the running state once more (should not trigger setup)
statusUpdateStream.next({
[operatorId]: { operatorState: OperatorState.Paused, aggregatedOutputRowCount: 0, aggregatedInputRowCount: 0 },
[operatorId]: {
operatorState: OperatorState.Paused,
aggregatedOutputRowCount: 0,
aggregatedInputRowCount: 0,
inputPortMetrics: {},
outputPortMetrics: {},
},
});

tick();
Expand All @@ -150,6 +174,8 @@ describe("CodeDebuggerComponent", () => {
operatorState: OperatorState.Uninitialized,
aggregatedOutputRowCount: 0,
aggregatedInputRowCount: 0,
inputPortMetrics: {},
outputPortMetrics: {},
},
});

Expand All @@ -163,6 +189,8 @@ describe("CodeDebuggerComponent", () => {
operatorState: OperatorState.Uninitialized,
aggregatedOutputRowCount: 0,
aggregatedInputRowCount: 0,
inputPortMetrics: {},
outputPortMetrics: {},
},
});

Expand Down
103 changes: 96 additions & 7 deletions core/gui/src/app/workspace/service/joint-ui/joint-ui.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ export const operatorViewResultIconClass = "texera-operator-view-result-icon";
export const operatorStateClass = "texera-operator-state";
export const operatorProcessedCountClass = "texera-operator-processed-count";
export const operatorOutputCountClass = "texera-operator-output-count";
export const operatorAbbreviatedCountClass = "texera-operator-abbreviated-count";
export const operatorCoeditorEditingClass = "texera-operator-coeditor-editing";
export const operatorCoeditorChangedPropertyClass = "texera-operator-coeditor-changed-property";

export const operatorIconClass = "texera-operator-icon";
export const operatorNameClass = "texera-operator-name";
export const operatorFriendlyNameClass = "texera-operator-friendly-name";
export const operatorPortMetricsClass = "texera-operator-port-metrics";

export const linkPathStrokeColor = "#919191";

Expand All @@ -129,9 +129,9 @@ class TexeraCustomJointElement extends joint.shapes.devs.Model {
<image class="${operatorIconClass}"></image>
<text class="${operatorFriendlyNameClass}"></text>
<text class="${operatorNameClass}"></text>
<text class="${operatorPortMetricsClass}"></text>
<text class="${operatorProcessedCountClass}"></text>
<text class="${operatorOutputCountClass}"></text>
<text class="${operatorAbbreviatedCountClass}"></text>
<text class="${operatorStateClass}"></text>
<text class="${operatorReuseCacheTextClass}"></text>
<text class="${operatorCoeditorEditingClass}"></text>
Expand Down Expand Up @@ -247,6 +247,7 @@ export class JointUIService {

// set operator element ID to be operator ID
operatorElement.set("id", operator.operatorID);
operatorElement.set("z", 1);

// set the input ports and output ports based on operator predicate
operator.inputPorts.forEach(port =>
Expand Down Expand Up @@ -288,7 +289,6 @@ export class JointUIService {
jointPaper.getModelById(operatorID).attr({
[`.${operatorProcessedCountClass}`]: { text: "" },
[`.${operatorOutputCountClass}`]: { text: "" },
[`.${operatorAbbreviatedCountClass}`]: { text: "" },
});
this.changeOperatorState(jointPaper, operatorID, OperatorState.Uninitialized);
return;
Expand All @@ -301,38 +301,96 @@ export class JointUIService {
const processedCountText = isSource ? "" : abbreviateNumber(statistics.aggregatedInputRowCount);
const outputCountText = isSink ? "" : abbreviateNumber(statistics.aggregatedOutputRowCount);
const abbreviatedText = processedCountText + (isSource || isSink ? "" : " → ") + outputCountText;

const element = jointPaper.getModelById(operatorID) as joint.shapes.devs.Model;
const allPorts = element.getPorts();
const inPorts = allPorts.filter(p => p.group === "in");
const outPorts = allPorts.filter(p => p.group === "out");

const inputMetrics = statistics.inputPortMetrics;
const outputMetrics = statistics.outputPortMetrics;

inPorts.forEach(portDef => {
const portId = portDef.id;
if (portId != null) {
const parts = portId.split("-");
const numericSuffix = parts.length > 1 ? parts[1] : portId;

const count: number = inputMetrics[numericSuffix] ?? 0;
const rawAttrs = (portDef.attrs as any) || {};
const oldText: string = (rawAttrs[".port-label"] && rawAttrs[".port-label"].text) || "";
let originalName = oldText.includes(":") ? oldText.split(":", 1)[0].trim() : oldText;

if (!originalName) {
Comment thread
aglinxinyuan marked this conversation as resolved.
originalName = portId;
}

const labelText = `${originalName}: ${count}`;

element.portProp(portId, "attrs/.port-label/text", labelText);
}
});

outPorts.forEach(portDef => {
const portId = portDef.id;
if (portId != null) {
const parts = portId.split("-");
const numericSuffix = parts.length > 1 ? parts[1] : portId;

const count: number = outputMetrics[numericSuffix] ?? 0;
const rawAttrs = (portDef.attrs as any) || {};
const oldText: string = (rawAttrs[".port-label"] && rawAttrs[".port-label"].text) || "";
let originalName = oldText.includes(":") ? oldText.split(":", 1)[0].trim() : oldText;

if (!originalName) {
Comment thread
aglinxinyuan marked this conversation as resolved.
originalName = portId;
}

const labelText = `${originalName}: ${count}`;

element.portProp(portId, "attrs/.port-label/text", labelText);
}
});

jointPaper.getModelById(operatorID).attr({
[`.${operatorProcessedCountClass}`]: isSink ? { text: processedText, "ref-y": -30 } : { text: processedText },
[`.${operatorOutputCountClass}`]: { text: outputText },
[`.${operatorAbbreviatedCountClass}`]: { text: abbreviatedText },
});

this.changeOperatorState(jointPaper, operatorID, statistics.operatorState);
}
public foldOperatorDetails(jointPaper: joint.dia.Paper, operatorID: string): void {
jointPaper.getModelById(operatorID).attr({
[`.${operatorAbbreviatedCountClass}`]: { visibility: "visible" },
[`.${operatorProcessedCountClass}`]: { visibility: "hidden" },
[`.${operatorOutputCountClass}`]: { visibility: "hidden" },
[`.${operatorStateClass}`]: { visibility: "hidden" },
[`.${operatorPortMetricsClass}`]: { visibility: "hidden" },
".delete-button": { visibility: "hidden" },
".add-input-port-button": { visibility: "hidden" },
".add-output-port-button": { visibility: "hidden" },
".remove-input-port-button": { visibility: "hidden" },
".remove-output-port-button": { visibility: "hidden" },
});
const element = jointPaper.getModelById(operatorID) as joint.shapes.devs.Model;
}

public unfoldOperatorDetails(jointPaper: joint.dia.Paper, operatorID: string): void {
jointPaper.getModelById(operatorID).attr({
[`.${operatorAbbreviatedCountClass}`]: { visibility: "hidden" },
[`.${operatorProcessedCountClass}`]: { visibility: "visible" },
[`.${operatorOutputCountClass}`]: { visibility: "visible" },
[`.${operatorStateClass}`]: { visibility: "visible" },
[`.${operatorPortMetricsClass}`]: { visibility: "visible" },
".delete-button": { visibility: "visible" },
".add-input-port-button": { visibility: "visible" },
".add-output-port-button": { visibility: "visible" },
".remove-input-port-button": { visibility: "visible" },
".remove-output-port-button": { visibility: "visible" },
});

const element = jointPaper.getModelById(operatorID) as joint.shapes.devs.Model;
if (!element) {
Comment thread
jaeyun0503 marked this conversation as resolved.
return;
}
}

public changeOperatorState(jointPaper: joint.dia.Paper, operatorID: string, operatorState: OperatorState): void {
Expand All @@ -359,9 +417,24 @@ export class JointUIService {
[`.${operatorStateClass}`]: { text: operatorState.toString() },
[`.${operatorStateClass}`]: { fill: fillColor },
"rect.body": { stroke: fillColor },
[`.${operatorAbbreviatedCountClass}`]: { fill: fillColor },
[`.${operatorProcessedCountClass}`]: { fill: fillColor },
[`.${operatorOutputCountClass}`]: { fill: fillColor },
[`.${operatorPortMetricsClass}`]: { fill: fillColor },
});
const element = jointPaper.getModelById(operatorID) as joint.shapes.devs.Model;
const allPorts = element.getPorts();
const inPorts = allPorts.filter(p => p.group === "in");
inPorts.forEach(p => {
if (p.id != null) {
element.portProp(p.id, "attrs/.port-label/fill", fillColor);
}
});

const outPorts = allPorts.filter(p => p.group === "out");
outPorts.forEach(p => {
if (p.id != null) {
element.portProp(p.id, "attrs/.port-label/fill", fillColor);
}
});
}

Expand Down Expand Up @@ -488,6 +561,7 @@ export class JointUIService {
port: link.target.portID,
});
jointLinkCell.set("id", link.linkID);
jointLinkCell.set("z", 0);
return jointLinkCell;
}

Expand Down Expand Up @@ -578,9 +652,13 @@ export class JointUIService {
stroke: "none",
},
".port-label": {
visibility: "visible",
event: "input-label:evt",
dblclick: "input-label:dbclick",
pointerdblclick: "input-label:pointerdblclick",
ref: ".port-body",
"ref-y": 0.5,
"y-alignment": "middle",
},
};
}
Expand Down Expand Up @@ -680,6 +758,17 @@ export class JointUIService {
"y-alignment": "middle",
"x-alignment": "middle",
},
".texera-operator-port-metrics": {
text: "",
fill: "green",
"font-size": "14px",
visibility: "hidden",
"ref-x": 0.5,
"ref-y": -70,
ref: "rect.body",
"y-alignment": "middle",
"x-alignment": "middle",
},
".texera-operator-processed-count": {
text: "",
fill: "green",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ describe("UdfDebugServiceSpec", () => {
operatorState: OperatorState.Uninitialized,
aggregatedInputRowCount: 0,
aggregatedOutputRowCount: 0,
inputPortMetrics: {},
outputPortMetrics: {},
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ export class WorkflowStatusService {
accumulator[operatorId] = {
operatorState: OperatorState.Uninitialized,
aggregatedInputRowCount: 0,
inputPortMetrics: {},
aggregatedOutputRowCount: 0,
outputPortMetrics: {},
};
return accumulator;
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ export interface OperatorStatistics
extends Readonly<{
operatorState: OperatorState;
aggregatedInputRowCount: number;
inputPortMetrics: Record<string, number>;
aggregatedOutputRowCount: number;
outputPortMetrics: Record<string, number>;
}> {}

export interface OperatorStatsUpdate
Expand Down