Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-6201 Suppressed stacktraces for expected error conditions (conne… #3427

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
*/
package org.apache.nifi.controller;

import java.lang.reflect.Proxy;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.configuration.DefaultSettings;
Expand Down Expand Up @@ -56,13 +62,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Proxy;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class ExtensionBuilder {
private static final Logger logger = LoggerFactory.getLogger(ExtensionBuilder.class);

Expand Down Expand Up @@ -190,7 +189,11 @@ public ProcessorNode buildProcessor() {
try {
loggableComponent = createLoggableProcessor();
} catch (final ProcessorInstantiationException pie) {
logger.error("Could not create Processor of type " + type + " for ID " + identifier + "; creating \"Ghost\" implementation", pie);
logger.error("Could not create Processor of type " + type + " for ID " + identifier + " due to: " + pie.getMessage() + "; creating \"Ghost\" implementation");
if (logger.isDebugEnabled()) {
logger.debug(pie.getMessage(), pie);
}

final GhostProcessor ghostProc = new GhostProcessor();
ghostProc.setIdentifier(identifier);
ghostProc.setCanonicalClassName(type);
Expand Down Expand Up @@ -280,7 +283,11 @@ public ControllerServiceNode buildControllerService() {
try {
return createControllerServiceNode();
} catch (final Exception e) {
logger.error("Could not create Controller Service of type " + type + " for ID " + identifier + "; creating \"Ghost\" implementation", e);
logger.error("Could not create Controller Service of type " + type + " for ID " + identifier + " due to: " + e.getMessage() + "; creating \"Ghost\" implementation");
if (logger.isDebugEnabled()) {
logger.debug(e.getMessage(), e);
}

return createGhostControllerServiceNode();
}
}
Expand All @@ -293,12 +300,12 @@ private ProcessorNode createProcessorNode(final LoggableComponent<Processor> pro
final ProcessorNode procNode;
if (creationSuccessful) {
procNode = new StandardProcessorNode(processor, identifier, validationContextFactory, processScheduler, serviceProvider,
componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
procNode = new StandardProcessorNode(processor, identifier, validationContextFactory, processScheduler, serviceProvider,
componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
}

applyDefaultSettings(procNode);
Expand All @@ -312,14 +319,14 @@ private ReportingTaskNode createReportingTaskNode(final LoggableComponent<Report
final ReportingTaskNode taskNode;
if (creationSuccessful) {
taskNode = new StandardReportingTaskNode(reportingTask, identifier, flowController, processScheduler,
validationContextFactory, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
validationContextFactory, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName());
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;

taskNode = new StandardReportingTaskNode(reportingTask, identifier, flowController, processScheduler, validationContextFactory,
componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
taskNode.setName(componentType);
}

Expand Down Expand Up @@ -374,7 +381,7 @@ private ControllerServiceNode createControllerServiceNode() throws ClassNotFound

final StateManager stateManager = stateManagerProvider.getStateManager(identifier);
final ControllerServiceInitializationContext initContext = new StandardControllerServiceInitializationContext(identifier, terminationAwareLogger,
serviceProvider, stateManager, kerberosConfig);
serviceProvider, stateManager, kerberosConfig);
serviceImpl.initialize(initContext);

final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(serviceImpl, bundleCoordinate, terminationAwareLogger);
Expand All @@ -383,7 +390,7 @@ private ControllerServiceNode createControllerServiceNode() throws ClassNotFound
final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, componentVarRegistry);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler,
identifier, validationContextFactory, serviceProvider, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
identifier, validationContextFactory, serviceProvider, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
serviceNode.setName(rawClass.getSimpleName());

invocationHandler.setServiceNode(serviceNode);
Expand All @@ -407,7 +414,7 @@ private ControllerServiceNode createGhostControllerServiceNode() {
final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, variableRegistry);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, identifier,
validationContextFactory, serviceProvider, componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
validationContextFactory, serviceProvider, componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);

return serviceNode;
}
Expand All @@ -417,7 +424,7 @@ private LoggableComponent<Processor> createLoggableProcessor() throws ProcessorI
final LoggableComponent<Processor> processorComponent = createLoggableComponent(Processor.class);

final ProcessorInitializationContext initiContext = new StandardProcessorInitializationContext(identifier, processorComponent.getLogger(),
serviceProvider, nodeTypeProvider, kerberosConfig);
serviceProvider, nodeTypeProvider, kerberosConfig);
processorComponent.getComponent().initialize(initiContext);

return processorComponent;
Expand All @@ -433,7 +440,7 @@ private LoggableComponent<ReportingTask> createLoggableReportingTask() throws Re

final String taskName = taskComponent.getComponent().getClass().getSimpleName();
final ReportingInitializationContext config = new StandardReportingInitializationContext(identifier, taskName,
SchedulingStrategy.TIMER_DRIVEN, "1 min", taskComponent.getLogger(), serviceProvider, kerberosConfig, nodeTypeProvider);
SchedulingStrategy.TIMER_DRIVEN, "1 min", taskComponent.getLogger(), serviceProvider, kerberosConfig, nodeTypeProvider);

taskComponent.getComponent().initialize(config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,23 @@
*/
package org.apache.nifi.controller.service;

import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
Expand Down Expand Up @@ -51,24 +68,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class StandardControllerServiceNode extends AbstractComponentNode implements ControllerServiceNode {

private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
Expand Down Expand Up @@ -288,7 +287,7 @@ public void verifyCanDelete() {

@Override
public void verifyCanDisable() {
verifyCanDisable(Collections.<ControllerServiceNode>emptySet());
verifyCanDisable(Collections.emptySet());
}

@Override
Expand Down Expand Up @@ -571,4 +570,21 @@ public void setVersionedComponentId(final String versionedComponentId) {
}
}
}

@Override
public String toString() {
String bundleCoordinate;
try {
bundleCoordinate = controllerServiceHolder.get().getBundleCoordinate().toString();
} catch (NullPointerException e) {
bundleCoordinate = "null";
}
return "StandardControllerServiceNode{" +
"controllerServiceHolder=" + bundleCoordinate +
", versionedComponentId=" + versionedComponentId +
", comment='" + comment + '\'' +
", processGroup=" + processGroup +
", active=" + active +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,7 @@
*/
package org.apache.nifi.controller.service;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -51,8 +35,23 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardControllerServiceProvider implements ControllerServiceProvider {

Expand All @@ -64,6 +63,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
private final FlowManager flowManager;

private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>();
private final String INVALID_CS_MESSAGE_SEGMENT = "cannot be enabled because it is not currently valid";

public StandardControllerServiceProvider(final FlowController flowController, final StandardProcessScheduler scheduler, final BulletinRepository bulletinRepo) {
this.flowController = flowController;
Expand Down Expand Up @@ -208,8 +208,14 @@ public void enableControllerServices(final Collection<ControllerServiceNode> ser
future.get(30, TimeUnit.SECONDS);
logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
} catch (final Exception e) {
logger.warn("Failed to enable service {}", controllerServiceNode, e);
// Nothing we can really do. Will attempt to enable this service anyway.
// If the service is not currently valid, there is no need to print the entire stacktrace
if (e.getLocalizedMessage().contains(INVALID_CS_MESSAGE_SEGMENT)) {
logger.warn("Failed to enable service {} because {}", controllerServiceNode, e.getLocalizedMessage());
} else {
// Print the whole stacktrace
logger.warn("Failed to enable service {}", controllerServiceNode, e);
// Nothing we can really do. Will attempt to enable this service anyway.
}
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -475,13 +481,13 @@ public boolean isControllerServiceEnabled(final ControllerService service) {
@Override
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? false : ControllerServiceState.ENABLED == node.getState();
return node != null && ControllerServiceState.ENABLED == node.getState();
}

@Override
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? false : ControllerServiceState.ENABLING == node.getState();
return node != null && ControllerServiceState.ENABLING == node.getState();
}

@Override
Expand Down