Skip to content

Commit

Permalink
NIFI-6780: Add/fix docs, cleanup warnings, fixed some table definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
mattyb149 committed Oct 18, 2019
1 parent 94f7ce0 commit 50ebe3e
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 27 deletions.
Expand Up @@ -79,12 +79,7 @@ public QueryResult query(final ReportingContext context, final String sql)

final CachedStatement cachedStatement = getStatement(sql, statementBuilder, statementQueues);
final PreparedStatement stmt = cachedStatement.getStatement();
final ResultSet rs;
try {
rs = stmt.executeQuery();
} catch (final Throwable t) {
throw t;
}
final ResultSet rs = stmt.executeQuery();

return new QueryResult() {
@Override
Expand Down Expand Up @@ -159,7 +154,6 @@ private CachedStatement buildCachedStatement(final String sql, final ReportingCo
final BulletinTable bulletinTable = new BulletinTable(context, getLogger());
rootSchema.add("BULLETINS", bulletinTable);

// TODO add the others
rootSchema.setCacheEnabled(false);

try {
Expand Down Expand Up @@ -216,5 +210,4 @@ public void closeQuietly(final AutoCloseable... closeables) {
private void onCacheEviction(final String key, final BlockingQueue<CachedStatement> queue, final RemovalCause cause) {
clearQueue(queue);
}

}
Expand Up @@ -28,7 +28,6 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
Expand All @@ -42,14 +41,15 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@Tags({"status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "sql"}) // TODO
@CapabilityDescription("Publishes NiFi status information based on the results of a user-specified SQL query.")
@Tags({"status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "prediction", "process", "group", "record", "sql"})
@CapabilityDescription("Publishes NiFi status information based on the results of a user-specified SQL query. The query may make use of the CONNECTION_STATUS, PROCESSOR_STATUS, "
+ "BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables, and can use any functions or capabilities provided by Apache Calcite.")
public class QueryNiFiReportingTask extends AbstractReportingTask {

static final PropertyDescriptor RECORD_SINK = new PropertyDescriptor.Builder()
.name("sql-reporting-record-sink")
.displayName("Record Destination Service")
.description("Specifies the Controller Service to use for writing out the records to some destination.")
.description("Specifies the Controller Service to use for writing out the query result records to some destination.")
.identifiesControllerService(RecordSinkService.class)
.required(true)
.build();
Expand All @@ -58,7 +58,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
.name("sql-reporting-query")
.displayName("SQL Query")
.description("SQL SELECT statement specifies which tables to query and how data should be filtered/transformed. "
+ "SQL SELECT can select from the CONNECTION_STATUS,PROCESSOR_STATUS,BULLETINS or JVM_METRICS tables") // TODO
+ "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(new SqlValidator())
Expand All @@ -67,22 +67,18 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new PropertyDescriptor.Builder()
.name("sql-reporting-include-zero-record-results")
.displayName("Include Zero Record Results")
.description("When running the SQL statement, if the result has no data, "
+ "this property specifies whether or not a Site-to-Site message (flow file, e.g.) will be transmitted.")
.description("When running the SQL statement, if the result has no data, this property specifies whether or not the empty result set will be transmitted.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();

private List<PropertyDescriptor> properties;

private volatile RecordSinkService recordSinkService;

private MetricsQueryService metricsQueryService;

@Override
protected void init(final ReportingInitializationContext config) throws InitializationException {
protected void init(final ReportingInitializationContext config) {
metricsQueryService = new MetricsSqlQueryService(getLogger());
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(QUERY);
Expand All @@ -93,14 +89,14 @@ protected void init(final ReportingInitializationContext config) throws Initiali

@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.properties;
return properties;
}

@Override
public void onTrigger(ReportingContext context) {
final StopWatch stopWatch = new StopWatch(true);
try {
recordSinkService = context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class);
final RecordSinkService recordSinkService = context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class);
final String sql = context.getProperty(QUERY).evaluateAttributeExpressions().getValue();
final QueryResult queryResult = metricsQueryService.query(context, sql);
final ResultSetRecordSet recordSet;
Expand Down
Expand Up @@ -180,13 +180,12 @@ public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
types.add(typeFactory.createJavaType(int.class));
types.add(typeFactory.createJavaType(int.class));
types.add(typeFactory.createJavaType(int.class));
types.add(typeFactory.createJavaType(int.class));
types.add(typeFactory.createJavaType(long.class));
types.add(typeFactory.createJavaType(double.class));
types.add(typeFactory.createJavaType(String.class));
types.add(typeFactory.createJavaType(long.class));
types.add(typeFactory.createJavaType(long.class));
types.add(typeFactory.createJavaType(long.class));

types.add(typeFactory.createJavaType(double.class));
types.add(typeFactory.createJavaType(double.class));
types.add(typeFactory.createJavaType(double.class));

// Add fields for the garbage collectors
metricsService.getMetrics(virtualMachineMetrics);
Expand Down
Expand Up @@ -84,7 +84,7 @@ public void close() {
public Enumerable<Object> project(final int[] fields) {
return new AbstractEnumerable<Object>() {
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
@SuppressWarnings({"rawtypes"})
public Enumerator<Object> enumerator() {
final ProcessorStatusEnumerator processorStatusEnumerator = new ProcessorStatusEnumerator(context, logger, fields) {
@Override
Expand Down Expand Up @@ -184,6 +184,7 @@ public RelDataType getRowType(final RelDataTypeFactory typeFactory) {
typeFactory.createJavaType(long.class),
typeFactory.createJavaType(long.class),
typeFactory.createJavaType(long.class),
typeFactory.createJavaType(long.class),
typeFactory.createJavaType(int.class),
typeFactory.createJavaType(int.class),
typeFactory.createJavaType(int.class),
Expand Down
@@ -0,0 +1,195 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>Groovy</title>
<!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
<link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
</head>

<body>
<h2>Summary</h2>
<p>
This reporting task can be used to issue SQL queries against various NiFi metrics information, modeled as tables,
and transmit the query results to some specified destination. The query may make use of the CONNECTION_STATUS,
PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables, and can
use any functions or capabilities provided by <a href="https://calcite.apache.org/">Apache Calcite</a>, including
JOINs, aggregate functions, etc.
</p>
<p>
The results are transmitted to the destination using the configured Record Sink service, such as
SiteToSiteReportingRecordSink (for sending via the Site-to-Site protocol) or DatabaseRecordSink (for sending the
query result rows to an relational database).
</p>
<br/>
<h2>Table Definitions</h2>
<p>
Below is a list of definitions for all the "tables" supported by this reporting task. Note that these are not
persistent/materialized tables, rather they are non-materialized views for which the sources are re-queried at
every execution. This means that a query executed twice may return different results, for example if new status
information is available, or in the case of JVM_METRICS (for example), a new snapshot of the JVM at query-time.
</p>
<br/>
<h3>CONNECTION_STATUS</h3>
<table title="CONNECTION_STATUS" border="1" width="500">
<tr><th>Column</th><th>Data Type</th></tr>
<tr><td>id</td><td>String</td></tr>
<tr><td>groupId</td><td>String</td></tr>
<tr><td>name</td><td>String</td></tr>
<tr><td>sourceId</td><td>String</td></tr>
<tr><td>sourceName</td><td>String</td></tr>
<tr><td>destinationId</td><td>String</td></tr>
<tr><td>destinationName</td><td>String</td></tr>
<tr><td>backPressureDataSizeThreshold</td><td>String</td></tr>
<tr><td>backPressureBytesThreshold</td><td>long</td></tr>
<tr><td>backPressureObjectThreshold</td><td>long</td></tr>
<tr><td>isBackPressureEnabled</td><td>boolean</td></tr>
<tr><td>inputCount</td><td>int</td></tr>
<tr><td>inputBytes</td><td>long</td></tr>
<tr><td>queuedCount</td><td>int</td></tr>
<tr><td>queuedBytes</td><td>long</td></tr>
<tr><td>outputCount</td><td>int</td></tr>
<tr><td>outputBytes</td><td>long</td></tr>
<tr><td>maxQueuedCount</td><td>int</td></tr>
<tr><td>maxQueuedBytes</td><td>long</td></tr>
</table>
<br/>
<h3>PROCESSOR_STATUS</h3>
<table title="PROCESSOR_STATUS" border="1" width="500">
<tr><th>Column</th><th>Data Type</th></tr>
<tr><td>id</td><td>String</td></tr>
<tr><td>groupId</td><td>String</td></tr>
<tr><td>name</td><td>String</td></tr>
<tr><td>processorType</td><td>String</td></tr>
<tr><td>averageLineageDuration</td><td>long</td></tr>
<tr><td>bytesRead</td><td>long</td></tr>
<tr><td>bytesWritten</td><td>long</td></tr>
<tr><td>bytesReceived</td><td>long</td></tr>
<tr><td>bytesSent</td><td>long</td></tr>
<tr><td>flowFilesRemoved</td><td>int</td></tr>
<tr><td>flowFilesReceived</td><td>int</td></tr>
<tr><td>flowFilesSent</td><td>int</td></tr>
<tr><td>inputCount</td><td>int</td></tr>
<tr><td>inputBytes</td><td>long</td></tr>
<tr><td>outputCount</td><td>int</td></tr>
<tr><td>outputBytes</td><td>long</td></tr>
<tr><td>activeThreadCount</td><td>int</td></tr>
<tr><td>terminatedThreadCount</td><td>int</td></tr>
<tr><td>invocations</td><td>int</td></tr>
<tr><td>processingNanos</td><td>long</td></tr>
<tr><td>runStatus</td><td>String</td></tr>
<tr><td>executionNode</td><td>String</td></tr>
</table>
<br/>
<h3>BULLETINS</h3>
<table title="BULLETINS" border="1" width="500">
<tr><th>Column</th><th>Data Type</th></tr>
<tr><td>bulletinId</td><td>long</td></tr>
<tr><td>bulletinCategory</td><td>String</td></tr>
<tr><td>bulletinGroupId</td><td>String</td></tr>
<tr><td>bulletinGroupName</td><td>String</td></tr>
<tr><td>bulletinLevel</td><td>String</td></tr>
<tr><td>bulletinMessage</td><td>String</td></tr>
<tr><td>bulletinNodeAddress</td><td>String</td></tr>
<tr><td>bulletinNodeId</td><td>String</td></tr>
<tr><td>bulletinSourceId</td><td>String</td></tr>
<tr><td>bulletinSourceName</td><td>String</td></tr>
<tr><td>bulletinSourceType</td><td>String</td></tr>
<tr><td>bulletinTimestamp</td><td>Date</td></tr>
</table>
<br/>
<h3>PROCESS_GROUP_STATUS</h3>
<table title="PROCESS_GROUP_STATUS" border="1" width="500">
<tr><th>Column</th><th>Data Type</th></tr>
<tr><td>id</td><td>String</td></tr>
<tr><td>groupId</td><td>String</td></tr>
<tr><td>name</td><td>String</td></tr>
<tr><td>bytesRead</td><td>long</td></tr>
<tr><td>bytesWritten</td><td>long</td></tr>
<tr><td>bytesReceived</td><td>long</td></tr>
<tr><td>bytesSent</td><td>long</td></tr>
<tr><td>bytesTransferred</td><td>long</td></tr>
<tr><td>flowFilesReceived</td><td>int</td></tr>
<tr><td>flowFilesSent</td><td>int</td></tr>
<tr><td>flowFilesTransferred</td><td>int</td></tr>
<tr><td>inputContentSize</td><td>long</td></tr>
<tr><td>inputCount</td><td>int</td></tr>
<tr><td>outputContentSize</td><td>long</td></tr>
<tr><td>outputCount</td><td>int</td></tr>
<tr><td>queuedContentSize</td><td>long</td></tr>
<tr><td>activeThreadCount</td><td>int</td></tr>
<tr><td>terminatedThreadCount</td><td>int</td></tr>
<tr><td>queuedCount</td><td>int</td></tr>
<tr><td>versionedFlowState</td><td>String</td></tr>
</table>
<br/>
<h3>JVM_METRICS</h3>
<p>
The JVM_METRICS table has dynamic columns in the sense that the "garbage collector runs" and
"garbage collector time columns" appear for each Java garbage collector in the JVM.
<br/>
The column names end with the name of the garbage collector substituted for the
<code>&lt;garbage_collector_name&gt;</code> expression below:
</p>
<table title="JVM_METRICS" border="1" width="500">
<tr><th>Column</th><th>Data Type</th></tr>
<tr><td>jvm_daemon_thread_count</td><td>int</td></tr>
<tr><td>jvm_thread_count</td><td>int</td></tr>
<tr><td>jvm_thread_states_blocked</td><td>int</td></tr>
<tr><td>jvm_thread_states_runnable</td><td>int</td></tr>
<tr><td>jvm_thread_states_terminated</td><td>int</td></tr>
<tr><td>jvm_thread_states_timed_waiting</td><td>int</td></tr>
<tr><td>jvm_uptime</td><td>long</td></tr>
<tr><td>jvm_head_used</td><td>double</td></tr>
<tr><td>jvm_heap_usage</td><td>double</td></tr>
<tr><td>jvm_non_heap_usage</td><td>double</td></tr>
<tr><td>jvm_file_descriptor_usage</td><td>double</td></tr>
<tr><td>jvm_gc_runs_<code>&lt;garbage_collector_name&gt;</code></td><td>long</td></tr>
<tr><td>jvm_gc_time_<code>&lt;garbage_collector_name&gt;</code></td><td>long</td></tr>
</table>
<br/>
<h3>CONNECTION_STATUS_PREDICTIONS</h3>
<table title="CONNECTION_STATUS_PREDICTIONS" border="1" width="500">
<tr><th>Column</th><th>Data Type</th></tr>
<tr><td>connectionId</td><td>String</td></tr>
<tr><td>predictedQueuedBytes</td><td>long</td></tr>
<tr><td>predictedQueuedCount</td><td>int</td></tr>
<tr><td>predictedPercentBytes</td><td>int</td></tr>
<tr><td>predictedPercentCount</td><td>int</td></tr>
<tr><td>predictedTimeToBytesBackpressureMillis</td><td>long</td></tr>
<tr><td>predictedTimeToCountBackpressureMillis</td><td>long</td></tr>
<tr><td>predictionIntervalMillis</td><td>long</td></tr>
</table>
<br/><br/>
<h2>SQL Query Examples</h2>
<p>
<b>Example:</b> Select all fields from the <code>CONNECTION_STATUS</code> table:<br/>
<pre>SELECT * FROM CONNECTION_STATUS</pre>
</p>
<br/>
<p>
<b>Example:</b> Select connection IDs where time-to-backpressure (based on queue count) is less than 5 minutes:<br/>
<pre>SELECT connectionId FROM CONNECTION_STATUS_PREDICTIONS WHERE predictedTimeToCountBackpressureMillis < 300000</pre>
</p>
<br/>
<p>
<b>Example:</b> Get the unique bulletin categories associated with errors:<br/>
<pre>SELECT DISTINCT(bulletinCategory) FROM BULLETINS WHERE bulletinLevel = "ERROR"</pre>
</p>
<br/>
</body>
</html>

0 comments on commit 50ebe3e

Please sign in to comment.