Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1535,21 +1535,30 @@
if (_.isNumber(percentUseCount)) {
var objectThreshold = _.get(d, 'component.backPressureObjectThreshold');

var predictionsAvailable = _.get(d, 'status.aggregateSnapshot.predictionsAvailable', false);
var predictedPercentCount = _.get(d, 'status.aggregateSnapshot.predictedPercentCount', -1);
var timeToBackPressure = _.get(d, 'status.aggregateSnapshot.predictedMillisUntilCountBackpressure', -1);

var tooltipLines = ['Queue: ' + _.clamp(percentUseCount, 0, 100) + '% full (based on ' + objectThreshold + ' object threshold)'];

// only show predicted percent if it is non-negative
if (_.isNumber(predictedPercentCount) && predictedPercentCount > -1) {
if (predictionsAvailable) {
// only show predicted percent if it is non-negative
var predictionIntervalSeconds = _.get(d, 'status.aggregateSnapshot.predictionIntervalSeconds', 60 * 5);
tooltipLines.push('Predicted queue (next ' + (predictionIntervalSeconds / 60 ) + ' mins): ' + _.clamp(predictedPercentCount, 0, 100) + '%')
}
if (_.isNumber(predictedPercentCount) && predictedPercentCount > -1) {
tooltipLines.push('Predicted queue (next ' + (predictionIntervalSeconds / 60 ) + ' mins): ' + _.clamp(predictedPercentCount, 0, 100) + '%')
} else {
tooltipLines.push('Predicted queue (next ' + (predictionIntervalSeconds / 60 ) + ' mins): NA' )
}

// only show an estimate if it is valid (non-negative but less than the max number supported)
if (_.isNumber(timeToBackPressure) && _.inRange(timeToBackPressure, 0, Number.MAX_SAFE_INTEGER) && !isAtBackPressure(d)) {
var duration = nfCommon.formatPredictedDuration(timeToBackPressure);
tooltipLines.push('Estimated time to back pressure: ' + duration);
// only show an estimate if it is valid (non-negative but less than the max number supported)
if (_.isNumber(timeToBackPressure) && _.inRange(timeToBackPressure, 0, Number.MAX_SAFE_INTEGER) && !isAtBackPressure(d)) {
var duration = nfCommon.formatPredictedDuration(timeToBackPressure);
tooltipLines.push('Estimated time to back pressure: ' + duration);
} else {
tooltipLines.push('Estimated time to back pressure: ' + (isAtBackPressure(d) ? 'now' : 'NA'));
}
} else {
tooltipLines.push('Queue Prediction is not configured')
}

if (_.isEmpty(tooltipLines)) {
Expand Down Expand Up @@ -1577,21 +1586,30 @@
if (_.isNumber(percentUseBytes)) {
var dataSizeThreshold = _.get(d, 'component.backPressureDataSizeThreshold');

var predictionsAvailable = _.get(d, 'status.aggregateSnapshot.predictionsAvailable', false);
var predictedPercentBytes = _.get(d, 'status.aggregateSnapshot.predictedPercentBytes', -1);
var timeToBackPressure = _.get(d, 'status.aggregateSnapshot.predictedMillisUntilBytesBackpressure', -1);

var tooltipLines = ['Queue: ' + _.clamp(percentUseBytes, 0, 100) + '% full (based on ' + dataSizeThreshold + ' data size threshold)'];

// only show predicted percent if it is non-negative
if (_.isNumber(predictedPercentBytes) && predictedPercentBytes > -1) {
if (predictionsAvailable) {
// only show predicted percent if it is non-negative
var predictionIntervalSeconds = _.get(d, 'status.aggregateSnapshot.predictionIntervalSeconds', 60 * 5);
tooltipLines.push('Predicted queue (next ' + (predictionIntervalSeconds / 60 ) + ' mins): ' + _.clamp(predictedPercentBytes, 0, 100) + '%')
}
if (_.isNumber(predictedPercentBytes) && predictedPercentBytes > -1) {
tooltipLines.push('Predicted queue (next ' + (predictionIntervalSeconds / 60) + ' mins): ' + _.clamp(predictedPercentBytes, 0, 100) + '%')
} else {
tooltipLines.push('Predicted queue (next ' + (predictionIntervalSeconds / 60 ) + ' mins): NA' )
}

// only show an estimate if it is valid (non-negative but less than the max number supported)
if (_.isNumber(timeToBackPressure) && _.inRange(timeToBackPressure, 0, Number.MAX_SAFE_INTEGER) && !isAtBackPressure(d)) {
var duration = nfCommon.formatPredictedDuration(timeToBackPressure);
tooltipLines.push('Estimated time to back pressure: ' + duration);
// only show an estimate if it is valid (non-negative but less than the max number supported)
if (_.isNumber(timeToBackPressure) && _.inRange(timeToBackPressure, 0, Number.MAX_SAFE_INTEGER) && !isAtBackPressure(d)) {
var duration = nfCommon.formatPredictedDuration(timeToBackPressure);
tooltipLines.push('Estimated time to back pressure: ' + duration);
} else {
tooltipLines.push('Estimated time to back pressure: ' + (isAtBackPressure(d) ? 'now' : 'NA'));
}
} else {
tooltipLines.push('Queue Prediction is not configured')
}

if (_.isEmpty(tooltipLines)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,50 @@
}
};


var backpressurePredictionFormatter = function (row, cell, value, columnDef, dataContext) {
var predictedMillisUntilBytesBackpressure = _.get(dataContext, 'predictedMillisUntilBytesBackpressure', -1);
var predictedMillisUntilCountBackpressure = _.get(dataContext, 'predictedMillisUntilCountBackpressure', -1);

var percentUseCount = _.get(dataContext, 'percentUseCount', 0);
var percentUseBytes = _.get(dataContext, 'percentUseBytes', 0);

var predictions = [
{ label: 'object', timeToBackPressure: predictedMillisUntilCountBackpressure },
{ label: 'size', timeToBackPressure: predictedMillisUntilBytesBackpressure },
];
var actualQueuePercents = [
{ label: 'object', percent: percentUseCount },
{ label: 'size', percent: percentUseBytes }
];

var minPrediction = _.minBy(predictions, 'timeToBackPressure');
var maxActual = _.maxBy(actualQueuePercents, 'percent');

if (maxActual.percent >= 100) {
// currently experiencing back pressure
return 'now (' + maxActual.label + ')';
} else if (minPrediction.timeToBackPressure < 0) {
// there is not a valid time-to-back-pressure prediction available
return 'NA';
}

var formatted = nfCommon.formatPredictedDuration(minPrediction.timeToBackPressure);
return nfCommon.escapeHtml(formatted) + ' (' + minPrediction.label + ')';
};

// define the column used to display backpressure predicted values (reused in both tables)
var backpressurePredictionColumn = {
id: 'backpressurePrediction',
field: 'backpressurePrediction',
name: 'Estimated Time to Back Pressure',
sortable: true,
defaultSortAsc: false,
formatter: backpressurePredictionFormatter,
resize: true,
toolTip: 'Estimated Time to Back Pressure'
};

/**
* Initializes the summary table.
*
Expand Down Expand Up @@ -729,37 +773,6 @@
return nfCommon.escapeHtml(percentUseCount) + DATA_SEPARATOR + nfCommon.escapeHtml(percentUseBytes);
};

var backpressurePredictionFormatter = function (row, cell, value, columnDef, dataContext) {
var predictedMillisUntilBytesBackpressure = _.get(dataContext, 'predictedMillisUntilBytesBackpressure', -1);
var predictedMillisUntilCountBackpressure = _.get(dataContext, 'predictedMillisUntilCountBackpressure', -1);

var percentUseCount = _.get(dataContext, 'percentUseCount', 0);
var percentUseBytes = _.get(dataContext, 'percentUseBytes', 0);

var predictions = [
{ label: 'object', timeToBackPressure: predictedMillisUntilCountBackpressure },
{ label: 'size', timeToBackPressure: predictedMillisUntilBytesBackpressure },
];
var actualQueuePercents = [
{ label: 'object', percent: percentUseCount },
{ label: 'size', percent: percentUseBytes }
];

var minPrediction = _.minBy(predictions, 'timeToBackPressure');
var maxActual = _.maxBy(actualQueuePercents, 'percent');

if (maxActual.percent >= 100) {
// currently experiencing back pressure
return 'now (' + maxActual.label + ')';
} else if (minPrediction.timeToBackPressure < 0) {
// there is not a valid time-to-back-pressure prediction available
return 'NA';
}

var formatted = nfCommon.formatPredictedDuration(minPrediction.timeToBackPressure);
return nfCommon.escapeHtml(formatted) + ' (' + minPrediction.label + ')';
};

// define the input, read, written, and output columns (reused between both tables)
var queueColumn = {
id: 'queued',
Expand All @@ -782,18 +795,6 @@
resize: true
};

// define the column used to display backpressure predicted values (reused in both tables)
var backpressurePredictionColumn = {
id: 'backpressurePrediction',
field: 'backpressurePrediction',
name: 'Estimated Time to Back Pressure',
sortable: true,
defaultSortAsc: false,
formatter: backpressurePredictionFormatter,
resize: true,
toolTip: 'Estimated Time to Back Pressure'
};

// define the column model for the summary table
var connectionsColumnModel = [
{
Expand Down Expand Up @@ -2856,10 +2857,40 @@
predictedMillisUntilBytesBackpressure: snapshot.predictedMillisUntilBytesBackpressure,
predictedMillisUntilCountBackpressure: snapshot.predictedMillisUntilCountBackpressure,
predictionIntervalSeconds: snapshot.predictionIntervalSeconds,
predictionsAvailable: snapshot.predictionsAvailable,
output: snapshot.output
});
});

// determine if the backpressure prediction column should be displayed or not
var anyPredictionsDisabled = _.some(clusterConnections, function (connectionItem) {
return connectionItem.predictionsAvailable !== true
});
var currentConnectionColumns = clusterConnectionsGrid.getColumns();
if (anyPredictionsDisabled) {
var connectionColumnsNoPredictedBackPressure = currentConnectionColumns.filter(function (column) {
return column.id !== backpressurePredictionColumn.id;
});
clusterConnectionsGrid.setColumns(connectionColumnsNoPredictedBackPressure);
} else {
// make sure the prediction column is there
var backPressurePredictionColumnIndex = currentConnectionColumns.findIndex(function (column) {
return column.id === backpressurePredictionColumn.id;
});
// if it is not there, add it immediately after the backpressure column
if (backPressurePredictionColumnIndex < 0) {
var backpressureColumnIndex = currentConnectionColumns.findIndex(function (column) {
return column.id === 'backpressure';
});
if (backpressureColumnIndex < 0) {
currentConnectionColumns.push(backpressurePredictionColumn);
} else {
currentConnectionColumns.splice(backpressureColumnIndex + 1, 0, backpressurePredictionColumn);
}
clusterConnectionsGrid.setColumns(currentConnectionColumns);
}
}

// update the processors
clusterConnectionsData.setItems(clusterConnections);
clusterConnectionsData.reSort();
Expand Down Expand Up @@ -3280,6 +3311,34 @@
// populate the tables
populateProcessGroupStatus(processorItems, connectionItems, processGroupItems, inputPortItems, outputPortItems, remoteProcessGroupItems, aggregateSnapshot, [aggregateSnapshot]);

var anyPredictionsDisabled = _.some(connectionItems, function (connectionItem) {
return connectionItem.predictionsAvailable !== true
});
var currentConnectionColumns = connectionsGrid.getColumns();
if (anyPredictionsDisabled) {
var connectionColumnsNoPredictedBackPressure = currentConnectionColumns.filter(function (column) {
return column.id !== backpressurePredictionColumn.id;
});
connectionsGrid.setColumns(connectionColumnsNoPredictedBackPressure);
} else {
// make sure the prediction column is there
var backPressurePredictionColumnIndex = currentConnectionColumns.findIndex(function (column) {
return column.id === backpressurePredictionColumn.id;
});
// if it is not there, add it immediately after the backpressure column
if (backPressurePredictionColumnIndex < 0) {
var backpressureColumnIndex = currentConnectionColumns.findIndex(function (column) {
return column.id === 'backpressure';
});
if (backpressureColumnIndex < 0) {
currentConnectionColumns.push(backpressurePredictionColumn);
} else {
currentConnectionColumns.splice(backpressureColumnIndex + 1, 0, backpressurePredictionColumn);
}
connectionsGrid.setColumns(currentConnectionColumns);
}
}

// update the processors
processorsData.setItems(processorItems);
processorsData.reSort();
Expand Down