Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ public abstract class NiFiProperties {
// analytics properties
public static final String ANALYTICS_PREDICTION_ENABLED = "nifi.analytics.predict.enabled";
public static final String ANALYTICS_PREDICTION_INTERVAL = "nifi.analytics.predict.interval";
public static final String ANALYTICS_QUERY_INTERVAL = "nifi.analytics.query.interval";
public static final String ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = "nifi.analytics.connection.model.implementation";
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_NAME= "nifi.analytics.connection.model.score.name";
public static final String ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD = "nifi.analytics.connection.model.score.threshold";
Expand Down Expand Up @@ -318,6 +319,7 @@ public abstract class NiFiProperties {
// analytics defaults
public static final String DEFAULT_ANALYTICS_PREDICTION_ENABLED = "false";
public static final String DEFAULT_ANALYTICS_PREDICTION_INTERVAL = "3 mins";
public static final String DEFAULT_ANALYTICS_QUERY_INTERVAL = "3 mins";
public final static String DEFAULT_ANALYTICS_CONNECTION_MODEL_IMPLEMENTATION = "org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares";
public static final String DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME = "rSquared";
public static final double DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD = .90;
Expand Down
3 changes: 2 additions & 1 deletion nifi-docs/src/main/asciidoc/administration-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2389,7 +2389,7 @@ In order to generate predictions, local status snapshot history is queried to ob

NiFi evaluates the model's effectiveness before sending prediction information by using the model's R-Squared score by default. One important note: R-Square is a measure of how close the regression line fits the observation data vs. how accurate the prediction will be; therefore there may be some measure of error. If the R-Squared score for the calculated model meets the configured threshold (as defined by `nifi.analytics.connection.model.score.threshold`) then the model will be used for prediction. Otherwise the model will not be used and predictions will not be available until a model is generated with a score that exceeds the threshold. Default R-Squared threshold value is `.9` however this can be tuned based on prediction requirements.

The prediction interval `nifi.analytics.predict.interval` can be configured to project out further when back pressure will occur. Predictions further out in time require more observations stored locally to generate an effective model. This may also require tuning of the model's scoring threshold value to select a score which can offer reasonable predictions.
The prediction interval `nifi.analytics.predict.interval` can be configured to project out further when back pressure will occur. The prediction query interval `nifi.analytics.query.interval` can also be configured to determine how far back in time past observations should be queried in order to generate the model. Adjustments to these settings may require tuning of the model's scoring threshold value to select a score that can offer reasonable predictions.

See <<analytics_properties>> for complete information on configuring analytic properties.

Expand Down Expand Up @@ -3341,6 +3341,7 @@ These properties determine the behavior of the internal NiFi predictive analytic
|*Property*|*Description*
|`nifi.analytics.predict.enabled`|This indicates whether prediction should be enabled for the cluster. The default is `false`.
|`nifi.analytics.predict.interval`|This indicates a time interval for which analytical predictions (queue saturation, e.g.) should be made. The default value is `3 mins`.
|`nifi.analytics.query.interval`|This indicates a time interval to query for past observations (e.g. the last 3 minutes of snapshots). The default value is `3 mins`. NOTE: This value should be at least 3 times greater than `nifi.components.status.snapshot.frequency` to ensure enough observations are retrieved for predictions.
|`nifi.analytics.connection.model.implementation`|This is the implementation class for the status analytics model used to make connection predictions. The default value is `org.apache.nifi.controller.status.analytics.models.OrdinaryLeastSquares`.
|`nifi.analytics.connection.model.score.name`|This is the name of the scoring type that should be used to evaluate model. The default value is `rSquared`.
|`nifi.analytics.connection.model.score.threshold`|This is the threshold for the scoring value (where model score should be above given threshold). The default value is `.9`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,17 @@ private FlowController(
predictionIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL, TimeUnit.MILLISECONDS);
}

// Determine interval for querying past observations
final String queryInterval = nifiProperties.getProperty(NiFiProperties.ANALYTICS_QUERY_INTERVAL, NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL);
long queryIntervalMillis;
try {
queryIntervalMillis = FormatUtils.getTimeDuration(queryInterval, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
LOG.warn("Analytics is enabled however could not retrieve value for "+ NiFiProperties.ANALYTICS_QUERY_INTERVAL + ". This property has been set to '"
+ NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL + "'");
queryIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL, TimeUnit.MILLISECONDS);
}

// Determine score name to use for evaluating model performance
String modelScoreName = nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_NAME, NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME);

Expand All @@ -632,7 +643,7 @@ private FlowController(
.getConnectionStatusModelMap(extensionManager, nifiProperties);

analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, modelMap,
predictionIntervalMillis, modelScoreName, modelScoreThreshold);
predictionIntervalMillis, queryIntervalMillis, modelScoreName, modelScoreThreshold);
}

eventAccess = new StandardEventAccess(this, flowFileEventRepository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ public class CachingConnectionStatusAnalyticsEngine extends ConnectionStatusAnal

public CachingConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository,
FlowFileEventRepository flowFileEventRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
long predictionIntervalMillis, String scoreName, double scoreThreshold) {
long predictionIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) {

super(flowManager,statusRepository,flowFileEventRepository,modelMap,predictionIntervalMillis,scoreName,scoreThreshold);
super(flowManager, statusRepository, flowFileEventRepository, modelMap, predictionIntervalMillis,
queryIntervalMillis, scoreName, scoreThreshold);
this.cache = Caffeine.newBuilder()
.expireAfterWrite(30, TimeUnit.MINUTES)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Optional;
import java.util.stream.Stream;

import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.flow.FlowManager;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class ConnectionStatusAnalytics implements StatusAnalytics {
private final Boolean supportOnlineLearning;
private Boolean extendWindow = false;
private long intervalMillis = 3L * 60 * 1000; // Default is 3 minutes
private long queryIntervalMillis = 3L * 60 * 1000; //Default is 3 minutes
private String scoreName = "rSquared";
private double scoreThreshold = .90;

Expand All @@ -78,7 +80,7 @@ public void refresh() {
//Obtain latest observations when available, extend window if needed to obtain minimum observations
this.queryWindow = new QueryWindow(extendWindow ? queryWindow.getStartTimeMillis() : queryWindow.getEndTimeMillis(), System.currentTimeMillis());
} else {
this.queryWindow = new QueryWindow(System.currentTimeMillis() - getIntervalTimeMillis(), System.currentTimeMillis());
this.queryWindow = new QueryWindow(System.currentTimeMillis() - getQueryIntervalMillis(), System.currentTimeMillis());
}

modelMap.forEach((metric, modelFunction) -> {
Expand All @@ -94,6 +96,13 @@ public void refresh() {
try {
LOG.debug("Refreshing model with new data for connection id: {} ", connectionIdentifier);
model.learn(Stream.of(features), Stream.of(values));

if(MapUtils.isNotEmpty(model.getScores())){
model.getScores().forEach((key, value) -> {
LOG.debug("Model Scores for prediction metric {} for connection id {}: {}={} ", metric, connectionIdentifier, key, value);
});
}

extendWindow = false;
} catch (Exception ex) {
LOG.debug("Exception encountered while training model for connection id {}: {}", connectionIdentifier, ex.getMessage());
Expand Down Expand Up @@ -137,6 +146,7 @@ public Long getTimeToBytesBackpressureMillis() {
predictFeatures.put(1, inOutRatio);
return convertTimePrediction(bytesModel.predictVariable(0, predictFeatures, backPressureBytes), System.currentTimeMillis());
} else {
LOG.debug("Model is not valid for calculating time back pressure by content size in bytes. Returning -1");
return -1L;
}
}
Expand Down Expand Up @@ -164,6 +174,7 @@ public Long getTimeToCountBackpressureMillis() {
predictFeatures.put(1, inOutRatio);
return convertTimePrediction(countModel.predictVariable(0, predictFeatures, backPressureCountThreshold), System.currentTimeMillis());
} else {
LOG.debug("Model is not valid for calculating time to back pressure by object count. Returning -1");
return -1L;
}
}
Expand All @@ -186,6 +197,7 @@ public Long getNextIntervalBytes() {
predictFeatures.add(inOutRatio);
return convertCountPrediction(bytesModel.predict(predictFeatures.toArray(new Double[2])));
} else {
LOG.debug("Model is not valid for predicting content size in bytes for next interval. Returning -1");
return -1L;
}
}
Expand All @@ -208,6 +220,7 @@ public Long getNextIntervalCount() {
predictFeatures.add(inOutRatio);
return convertCountPrediction(countModel.predict(predictFeatures.toArray(new Double[2])));
} else {
LOG.debug("Model is not valid for predicting object count for next interval. Returning -1");
return -1L;
}

Expand Down Expand Up @@ -266,6 +279,14 @@ public void setIntervalTimeMillis(long intervalTimeMillis) {
this.intervalMillis = intervalTimeMillis;
}

public long getQueryIntervalMillis() {
return queryIntervalMillis;
}

public void setQueryIntervalMillis(long queryIntervalMillis) {
this.queryIntervalMillis = queryIntervalMillis;
}

public String getScoreName() {
return scoreName;
}
Expand Down Expand Up @@ -334,6 +355,7 @@ private FlowFileEvent getStatusReport() {
*/
private Long convertTimePrediction(Double prediction, Long timeMillis) {
if (Double.isNaN(prediction) || Double.isInfinite(prediction) || prediction < timeMillis) {
LOG.debug("Time prediction value is invalid: {}. Returning -1.",prediction);
return -1L;
} else {
return Math.max(0, Math.round(prediction) - timeMillis);
Expand All @@ -346,7 +368,8 @@ private Long convertTimePrediction(Double prediction, Long timeMillis) {
* @return prediction prediction value converted into valid value for consumption
*/
private Long convertCountPrediction(Double prediction) {
if (Double.isNaN(prediction) || Double.isInfinite(prediction) || prediction < 0) {
if (Double.isNaN(prediction) || Double.isInfinite(prediction)) {
LOG.debug("Count prediction value is invalid: {}. Returning -1.",prediction);
return -1L;
} else {
return Math.max(0, Math.round(prediction));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@ public class ConnectionStatusAnalyticsEngine implements StatusAnalyticsEngine {
protected final FlowFileEventRepository flowFileEventRepository;
protected final Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap;
protected final long predictionIntervalMillis;
protected final long queryIntervalMillis;
protected final String scoreName;
protected final double scoreThreshold;

public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusRepository statusRepository, FlowFileEventRepository flowFileEventRepository,
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, long predictionIntervalMillis, String scoreName, double scoreThreshold) {
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap, long predictionIntervalMillis,
long queryIntervalMillis, String scoreName, double scoreThreshold) {
this.flowManager = flowManager;
this.statusRepository = statusRepository;
this.flowFileEventRepository = flowFileEventRepository;
this.predictionIntervalMillis = predictionIntervalMillis;
this.modelMap = modelMap;
this.queryIntervalMillis = queryIntervalMillis;
this.scoreName = scoreName;
this.scoreThreshold = scoreThreshold;
}
Expand All @@ -60,6 +63,7 @@ public ConnectionStatusAnalyticsEngine(FlowManager flowManager, ComponentStatusR
public StatusAnalytics getStatusAnalytics(String identifier) {
ConnectionStatusAnalytics connectionStatusAnalytics = new ConnectionStatusAnalytics(statusRepository, flowManager, flowFileEventRepository, modelMap, identifier, false);
connectionStatusAnalytics.setIntervalTimeMillis(predictionIntervalMillis);
connectionStatusAnalytics.setQueryIntervalMillis(queryIntervalMillis);
connectionStatusAnalytics.setScoreName(scoreName);
connectionStatusAnalytics.setScoreThreshold(scoreThreshold);
connectionStatusAnalytics.refresh();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ public class TestCachingConnectionStatusAnalyticsEngine extends TestStatusAnalyt
public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository,
ComponentStatusRepository componentStatusRepository,
Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
long predictIntervalMillis, String scoreName, double scoreThreshold) {
long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) {

return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, modelMap, predictIntervalMillis, scoreName, scoreThreshold);
return new CachingConnectionStatusAnalyticsEngine(flowManager, componentStatusRepository, flowFileEventRepository, modelMap, predictIntervalMillis,
queryIntervalMillis, scoreName, scoreThreshold);
}

@Test
public void testCachedStatusAnalytics() {
StatusAnalyticsEngine statusAnalyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository, modelMap,
DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_QUERY_INTERVAL_MILLIS,
DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
StatusAnalytics statusAnalyticsA = statusAnalyticsEngine.getStatusAnalytics("A");
StatusAnalytics statusAnalyticsB = statusAnalyticsEngine.getStatusAnalytics("B");
StatusAnalytics statusAnalyticsTest = statusAnalyticsEngine.getStatusAnalytics("A");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ public class TestConnectionStatusAnalyticsEngine extends TestStatusAnalyticsEngi
@Override
public StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository,
ComponentStatusRepository statusRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
long predictIntervalMillis, String scoreName, double scoreThreshold) {
return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository,modelMap, DEFAULT_PREDICT_INTERVAL_MILLIS, scoreName, scoreThreshold);
long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold) {
return new ConnectionStatusAnalyticsEngine(flowManager, statusRepository, flowFileEventRepository,modelMap,
DEFAULT_PREDICT_INTERVAL_MILLIS, DEFAULT_QUERY_INTERVAL_MILLIS, scoreName, scoreThreshold);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
public abstract class TestStatusAnalyticsEngine {

static final long DEFAULT_PREDICT_INTERVAL_MILLIS = 3L * 60 * 1000;
static final long DEFAULT_QUERY_INTERVAL_MILLIS = 3L * 60 * 1000;
static final String DEFAULT_SCORE_NAME = "rSquared";
static final double DEFAULT_SCORE_THRESHOLD = .9;

Expand Down Expand Up @@ -89,13 +90,13 @@ public Tuple<Stream<Double[]>, Stream<Double>> answer(InvocationOnMock invocatio
@Test
public void testGetStatusAnalytics() {
StatusAnalyticsEngine statusAnalyticsEngine = getStatusAnalyticsEngine(flowManager,flowFileEventRepository, statusRepository, modelMap, DEFAULT_PREDICT_INTERVAL_MILLIS,
DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
DEFAULT_QUERY_INTERVAL_MILLIS, DEFAULT_SCORE_NAME, DEFAULT_SCORE_THRESHOLD);
StatusAnalytics statusAnalytics = statusAnalyticsEngine.getStatusAnalytics("1");
assertNotNull(statusAnalytics);
}

public abstract StatusAnalyticsEngine getStatusAnalyticsEngine(FlowManager flowManager, FlowFileEventRepository flowFileEventRepository,
ComponentStatusRepository componentStatusRepository, Map<String, Tuple<StatusAnalyticsModel, StatusMetricExtractFunction>> modelMap,
long predictIntervalMillis, String scoreName, double scoreThreshold);
long predictIntervalMillis, long queryIntervalMillis, String scoreName, double scoreThreshold);

}
Loading