Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa

static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder()
.name("Platform")
.description("The value to use for the platform field in each provenance event.")
.description("The value to use for the platform field in each status record.")
.required(true)
.expressionLanguageSupported(true)
.defaultValue("nifi")
Expand Down Expand Up @@ -179,7 +179,7 @@ public void onTrigger(final ReportingContext context) {
toIndex = Math.min(fromIndex + batchSize, jsonArray.size());
jsonBatch = jsonArray.subList(fromIndex, toIndex);
} catch (final IOException e) {
throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e);
throw new ProcessException("Failed to send Status Records to destination due to IOException:" + e.getMessage(), e);
}
}
}
Expand Down Expand Up @@ -343,6 +343,10 @@ void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBu
addField(builder, "inputCount", status.getInputCount());
addField(builder, "outputBytes", status.getOutputBytes());
addField(builder, "outputCount", status.getOutputCount());
addField(builder, "backPressureBytesThreshold", status.getBackPressureBytesThreshold());
addField(builder, "backPressureObjectThreshold", status.getBackPressureObjectThreshold());
addField(builder, "isBackPressureEnabled", Boolean.toString((status.getBackPressureObjectThreshold() > 0 && status.getBackPressureObjectThreshold() <= status.getQueuedCount())
|| (status.getBackPressureBytesThreshold() > 0 && status.getBackPressureBytesThreshold() <= status.getMaxQueuedBytes())));

arrayBuilder.add(builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,24 @@ public void testComponentTypeFilter() throws IOException, InitializationExceptio
assertEquals(pgStatus.getId(), componentId.getString());
}

@Test
public void testConnectionStatus() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);

final Map<PropertyDescriptor, String> properties = new HashMap<>();
properties.put(SiteToSiteStatusReportingTask.BATCH_SIZE, "4");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_NAME_FILTER_REGEX, "Awesome.*");
properties.put(SiteToSiteStatusReportingTask.COMPONENT_TYPE_FILTER_REGEX, "(Connection)");

MockSiteToSiteStatusReportingTask task = initTask(properties, pgStatus);
task.onTrigger(context);

final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8);
JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes()));
JsonString backpressure = jsonReader.readArray().getJsonObject(0).getJsonString("isBackPressureEnabled");
assertEquals("true", backpressure.getString());
}

@Test
public void testComponentNameFilter() throws IOException, InitializationException {
final ProcessGroupStatus pgStatus = generateProcessGroupStatus("root", "Awesome", 1, 0);
Expand Down