Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion nifi-docs/src/main/asciidoc/administration-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,24 @@ Component logs provide the following MDC named values:
- `processGroupName` contains the name of the Process Group
- `processGroupNamePath` contains of the hierarchy of names for Process Groups with separators

Components that run inside a Connector-managed flow also carry framework-supplied MDC values that identify the owning
Connector:

- `connectorId` contains the UUID of the Connector
- `connectorName` contains the user-visible name of the Connector
- `connectorComponent` contains the fully qualified class name of the Connector implementation
- `connectorBundleGroup`, `connectorBundleArtifact`, `connectorBundleVersion` identify the NAR bundle the Connector was
loaded from

A Connector may also supply additional, implementation-specific MDC values via
`ConnectorInitializationContext.setLoggingAttributes(Map)`. Keys reserved by the framework (those listed above) cannot
be overridden; attempts to do so are dropped and logged as a `WARN`.

MDC named values can be added to a Logback pattern layout using the `mdc` conversion word.

[source, xml]
----
<pattern>%date %level [%thread] %mdc{processGroupId} %logger{40} %msg%n</pattern>
<pattern>%date %level [%thread] %mdc{connectorId} %mdc{processGroupId} %logger{40} %msg%n</pattern>
----

Logs from classes other than extension components do not have MDC named values. Logs formatted using the pattern layout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private static final String UNREGISTERED_PATH_SEGMENT = "UNREGISTERED";

private final Map<String, String> loggingAttributes = new ConcurrentHashMap<>();
private volatile Map<String, String> connectorLoggingAttributes = Map.of();
private volatile String logFileSuffix;

public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
Expand Down Expand Up @@ -303,6 +304,14 @@ public ProcessGroup getParent() {
@Override
public void setParent(final ProcessGroup newParent) {
parent.set(newParent);
// Inherit connector-supplied MDC attributes from the new parent. Descendants of a connector's
// managed flow root must carry the same connectorId/connectorName/etc. so that logs and OTel
// metrics emitted by processors inside the connector flow can be attributed to the connector.
// This runs each time the PG is re-parented (including initial attach), so new PGs added to
// an existing connector flow inherit automatically.
if (newParent instanceof StandardProcessGroup standardParent) {
this.connectorLoggingAttributes = standardParent.connectorLoggingAttributes;
}
setLoggingAttributes();
}

Expand Down Expand Up @@ -4705,6 +4714,47 @@ private void setLoggingAttributes() {
final String registeredFlowVersion = currentVersionControl.getVersion();
loggingAttributes.put(LoggingAttribute.REGISTERED_FLOW_VERSION.attribute, registeredFlowVersion);
}

// Merge connector-supplied attributes last so they are exposed to MDC/OTel snapshots taken
// from this PG. PG-level keys are added first to avoid being shadowed in case a connector
// ever tried to override them; the reserved-key filter on the connector side also enforces
// this separation defensively.
loggingAttributes.putAll(connectorLoggingAttributes);
}

/**
* Stores the connector-managed MDC attributes for this process group and cascades the same
* attributes to all descendant process groups so that components anywhere in the connector's
* managed flow log with consistent connectorId/connectorName/etc. context.
*
* <p>This method is called by {@code StandardConnectorNode} against its managed root process
* group whenever the connector's framework keys (e.g. {@code connectorName}) change or when the
* connector provides updated custom logging attributes. Newly created descendant groups will
* also inherit the attributes lazily via {@link #setParent(ProcessGroup)}.</p>
*
* @param attributes the merged set of connector logging attributes; an empty or {@code null}
* map clears any previously assigned attributes
*/
public void setConnectorLoggingAttributes(final Map<String, String> attributes) {
final Map<String, String> snapshot = (attributes == null || attributes.isEmpty())
? Map.of()
: Map.copyOf(attributes);
this.connectorLoggingAttributes = snapshot;
setLoggingAttributes();

for (final ProcessGroup child : getProcessGroups()) {
if (child instanceof StandardProcessGroup standardChild) {
standardChild.setConnectorLoggingAttributes(snapshot);
}
}
}

/**
* Returns the connector-supplied MDC attributes assigned to this process group, if any. Returns
* an empty map when this PG is not inside a connector-managed flow.
*/
public Map<String, String> getConnectorLoggingAttributes() {
return connectorLoggingAttributes;
}

private void setGroupPath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,11 @@ ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStat
status.setBytesTransferred(bytesTransferred);
status.setProcessingNanos(processingNanos);
status.setProcessingPerformanceStatus(performanceStatus);
// Snapshot the PG's MDC logging attributes onto the status DTO so reporting tasks (e.g. the
// OpenTelemetry reporting task) can attribute metrics emitted for this PG to its owning
// connector. For non-connector PGs this is just the existing processGroupId/Name/path keys;
// for PGs inside a connector-managed flow it also carries connectorId/connectorName/etc.
status.setLoggingAttributes(group.getLoggingAttributes());

final VersionControlInformation vci = group.getVersionControlInformation();
if (vci != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -238,4 +239,110 @@ void testGetLoggingAttributesWithVersionControlInformation() {

assertEquals(expected, loggingAttributes);
}

@Test
void testSetConnectorLoggingAttributesMergesIntoLoggingAttributes() {
processGroup.setName(NAME);

final Map<String, String> connectorAttributes = Map.of(
"connectorId", "connector-1",
"connectorName", "My Connector",
"connectorComponent", "com.example.MyConnector"
);

processGroup.setConnectorLoggingAttributes(connectorAttributes);

final Map<String, String> loggingAttributes = processGroup.getLoggingAttributes();
assertEquals("connector-1", loggingAttributes.get("connectorId"));
assertEquals("My Connector", loggingAttributes.get("connectorName"));
assertEquals("com.example.MyConnector", loggingAttributes.get("connectorComponent"));
assertEquals(NAME, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_NAME.getAttribute()));
assertEquals(ID, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_ID.getAttribute()));
}

@Test
void testSetConnectorLoggingAttributesCascadesToChildProcessGroups() {
processGroup.setName(NAME);

final StandardProcessGroup child = createStandardProcessGroup("child-id");
child.setName("Child");
processGroup.addProcessGroup(child);

final Map<String, String> connectorAttributes = Map.of(
"connectorId", "connector-1",
"connectorName", "Postgres CDC"
);

processGroup.setConnectorLoggingAttributes(connectorAttributes);

assertEquals("connector-1", child.getLoggingAttributes().get("connectorId"));
assertEquals("Postgres CDC", child.getLoggingAttributes().get("connectorName"));
}

@Test
void testAddProcessGroupInheritsConnectorLoggingAttributesFromParent() {
processGroup.setName(NAME);
processGroup.setConnectorLoggingAttributes(Map.of(
"connectorId", "connector-1",
"connectorName", "Postgres CDC"
));

final StandardProcessGroup lateChild = createStandardProcessGroup("late-child-id");
lateChild.setName("Late Child");
processGroup.addProcessGroup(lateChild);

assertEquals("connector-1", lateChild.getLoggingAttributes().get("connectorId"));
assertEquals("Postgres CDC", lateChild.getLoggingAttributes().get("connectorName"));
assertEquals(Map.copyOf(processGroup.getConnectorLoggingAttributes()), lateChild.getConnectorLoggingAttributes());
}

@Test
void testEmptyConnectorLoggingAttributesAddsNothing() {
processGroup.setName(NAME);
processGroup.setConnectorLoggingAttributes(Map.of());

final Map<String, String> loggingAttributes = processGroup.getLoggingAttributes();
assertFalse(loggingAttributes.containsKey("connectorId"));
assertTrue(processGroup.getConnectorLoggingAttributes().isEmpty());
// Verify the PG-level keys are still present.
assertEquals(NAME, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_NAME.getAttribute()));
}

@Test
void testSetConnectorLoggingAttributesReplacesPreviousValues() {
processGroup.setName(NAME);
processGroup.setConnectorLoggingAttributes(Map.of(
"connectorId", "connector-1",
"connectorName", "Old Name",
"customKey", "customValue"
));

processGroup.setConnectorLoggingAttributes(Map.of(
"connectorId", "connector-1",
"connectorName", "New Name"
));

final Map<String, String> loggingAttributes = processGroup.getLoggingAttributes();
assertEquals("New Name", loggingAttributes.get("connectorName"));
assertFalse(loggingAttributes.containsKey("customKey"));
}

private StandardProcessGroup createStandardProcessGroup(final String id) {
return new StandardProcessGroup(
id,
controllerServiceProvider,
processScheduler,
propertyEncryptor,
extensionManager,
stateManagerProvider,
flowManager,
reloadComponent,
nodeTypeProvider,
properties,
statelessGroupNodeFactory,
assetManager,
null
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.apache.nifi.components.connector.secrets.SecretsManager;
import org.apache.nifi.logging.ComponentLog;

import java.util.Map;
import java.util.function.Consumer;

public interface FrameworkConnectorInitializationContextBuilder {

FrameworkConnectorInitializationContextBuilder identifier(String identifier);
Expand All @@ -35,5 +38,14 @@ public interface FrameworkConnectorInitializationContextBuilder {

FrameworkConnectorInitializationContextBuilder componentBundleLookup(ComponentBundleLookup bundleLookup);

/**
* Registers a callback invoked when the connector calls
* {@link ConnectorInitializationContext#setLoggingAttributes(Map)}. The framework uses this to
* forward custom MDC logging attributes into the owning {@code StandardConnectorNode} so they
* are merged with the framework-managed connector keys and propagated to the connector's
* managed flow.
*/
FrameworkConnectorInitializationContextBuilder loggingAttributesConsumer(Consumer<Map<String, String>> loggingAttributesConsumer);

FrameworkConnectorInitializationContext build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.nifi.logging.ComponentLog;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

public class StandardConnectorInitializationContext implements FrameworkConnectorInitializationContext {
Expand All @@ -37,6 +38,7 @@ public class StandardConnectorInitializationContext implements FrameworkConnecto
private final SecretsManager secretsManager;
private final AssetManager assetManager;
private final ComponentBundleLookup componentBundleLookup;
private final Consumer<Map<String, String>> loggingAttributesConsumer;


protected StandardConnectorInitializationContext(final Builder builder) {
Expand All @@ -46,6 +48,7 @@ protected StandardConnectorInitializationContext(final Builder builder) {
this.secretsManager = builder.secretsManager;
this.assetManager = builder.assetManager;
this.componentBundleLookup = builder.componentBundleLookup;
this.loggingAttributesConsumer = builder.loggingAttributesConsumer;
}

@Override
Expand Down Expand Up @@ -78,6 +81,14 @@ public AssetManager getAssetManager() {
return assetManager;
}

@Override
public void setLoggingAttributes(final Map<String, String> attributes) {
if (loggingAttributesConsumer == null) {
throw new UnsupportedOperationException("setLoggingAttributes is not supported by this initialization context");
}
loggingAttributesConsumer.accept(attributes);
}

@Override
public void updateFlow(final FlowContext flowContext, final VersionedExternalFlow versionedExternalFlow,
final BundleCompatibility bundleCompatability) throws FlowUpdateException {
Expand Down Expand Up @@ -160,6 +171,7 @@ public static class Builder implements FrameworkConnectorInitializationContextBu
private SecretsManager secretsManager;
private AssetManager assetManager;
private ComponentBundleLookup componentBundleLookup;
private Consumer<Map<String, String>> loggingAttributesConsumer;

@Override
public Builder identifier(final String identifier) {
Expand Down Expand Up @@ -197,6 +209,12 @@ public Builder componentBundleLookup(final ComponentBundleLookup bundleLookup) {
return this;
}

@Override
public Builder loggingAttributesConsumer(final Consumer<Map<String, String>> loggingAttributesConsumer) {
this.loggingAttributesConsumer = loggingAttributesConsumer;
return this;
}

@Override
public StandardConnectorInitializationContext build() {
return new StandardConnectorInitializationContext(this);
Expand Down
Loading
Loading