Skip to content

Commit

Permalink
NIFI-6201 Suppressed stacktraces for expected error conditions (conne…
Browse files Browse the repository at this point in the history
…ction timeout, missing component bundle, etc.) in application startup when the stacktrace provides no additional helpful data.

NIFI-6201 Fixed existing checkstyle complaint on missing Javadoc element.

NIFI-6201 Restored "connection refused" text to relevant registry sync error message.

NIFI-6201 Simplified error logging for bundle not found issues and removed unused bundle coordinate methods.

This closes #3427.

Signed-off-by: Kevin Doran <kdoran@apache.org>
  • Loading branch information
alopresto authored and kevdoran committed Apr 15, 2019
1 parent 7b94518 commit 22563fc
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 367 deletions.
Expand Up @@ -16,6 +16,12 @@
*/
package org.apache.nifi.controller;

import java.lang.reflect.Proxy;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.configuration.DefaultSettings;
Expand Down Expand Up @@ -56,13 +62,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Proxy;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class ExtensionBuilder {
private static final Logger logger = LoggerFactory.getLogger(ExtensionBuilder.class);

Expand Down Expand Up @@ -190,7 +189,11 @@ public ProcessorNode buildProcessor() {
try {
loggableComponent = createLoggableProcessor();
} catch (final ProcessorInstantiationException pie) {
logger.error("Could not create Processor of type " + type + " for ID " + identifier + "; creating \"Ghost\" implementation", pie);
logger.error("Could not create Processor of type " + type + " for ID " + identifier + " due to: " + pie.getMessage() + "; creating \"Ghost\" implementation");
if (logger.isDebugEnabled()) {
logger.debug(pie.getMessage(), pie);
}

final GhostProcessor ghostProc = new GhostProcessor();
ghostProc.setIdentifier(identifier);
ghostProc.setCanonicalClassName(type);
Expand Down Expand Up @@ -280,7 +283,11 @@ public ControllerServiceNode buildControllerService() {
try {
return createControllerServiceNode();
} catch (final Exception e) {
logger.error("Could not create Controller Service of type " + type + " for ID " + identifier + "; creating \"Ghost\" implementation", e);
logger.error("Could not create Controller Service of type " + type + " for ID " + identifier + " due to: " + e.getMessage() + "; creating \"Ghost\" implementation");
if (logger.isDebugEnabled()) {
logger.debug(e.getMessage(), e);
}

return createGhostControllerServiceNode();
}
}
Expand All @@ -293,12 +300,12 @@ private ProcessorNode createProcessorNode(final LoggableComponent<Processor> pro
final ProcessorNode procNode;
if (creationSuccessful) {
procNode = new StandardProcessorNode(processor, identifier, validationContextFactory, processScheduler, serviceProvider,
componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
procNode = new StandardProcessorNode(processor, identifier, validationContextFactory, processScheduler, serviceProvider,
componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
}

applyDefaultSettings(procNode);
Expand All @@ -312,14 +319,14 @@ private ReportingTaskNode createReportingTaskNode(final LoggableComponent<Report
final ReportingTaskNode taskNode;
if (creationSuccessful) {
taskNode = new StandardReportingTaskNode(reportingTask, identifier, flowController, processScheduler,
validationContextFactory, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
validationContextFactory, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
taskNode.setName(taskNode.getReportingTask().getClass().getSimpleName());
} else {
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;

taskNode = new StandardReportingTaskNode(reportingTask, identifier, flowController, processScheduler, validationContextFactory,
componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
taskNode.setName(componentType);
}

Expand Down Expand Up @@ -374,7 +381,7 @@ private ControllerServiceNode createControllerServiceNode() throws ClassNotFound

final StateManager stateManager = stateManagerProvider.getStateManager(identifier);
final ControllerServiceInitializationContext initContext = new StandardControllerServiceInitializationContext(identifier, terminationAwareLogger,
serviceProvider, stateManager, kerberosConfig);
serviceProvider, stateManager, kerberosConfig);
serviceImpl.initialize(initContext);

final LoggableComponent<ControllerService> originalLoggableComponent = new LoggableComponent<>(serviceImpl, bundleCoordinate, terminationAwareLogger);
Expand All @@ -383,7 +390,7 @@ private ControllerServiceNode createControllerServiceNode() throws ClassNotFound
final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, componentVarRegistry);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(originalLoggableComponent, proxiedLoggableComponent, invocationHandler,
identifier, validationContextFactory, serviceProvider, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
identifier, validationContextFactory, serviceProvider, componentVarRegistry, reloadComponent, extensionManager, validationTrigger);
serviceNode.setName(rawClass.getSimpleName());

invocationHandler.setServiceNode(serviceNode);
Expand All @@ -407,7 +414,7 @@ private ControllerServiceNode createGhostControllerServiceNode() {
final ComponentVariableRegistry componentVarRegistry = new StandardComponentVariableRegistry(this.variableRegistry);
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(serviceProvider, variableRegistry);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedLoggableComponent, proxiedLoggableComponent, invocationHandler, identifier,
validationContextFactory, serviceProvider, componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);
validationContextFactory, serviceProvider, componentType, type, componentVarRegistry, reloadComponent, extensionManager, validationTrigger, true);

return serviceNode;
}
Expand All @@ -417,7 +424,7 @@ private LoggableComponent<Processor> createLoggableProcessor() throws ProcessorI
final LoggableComponent<Processor> processorComponent = createLoggableComponent(Processor.class);

final ProcessorInitializationContext initiContext = new StandardProcessorInitializationContext(identifier, processorComponent.getLogger(),
serviceProvider, nodeTypeProvider, kerberosConfig);
serviceProvider, nodeTypeProvider, kerberosConfig);
processorComponent.getComponent().initialize(initiContext);

return processorComponent;
Expand All @@ -433,7 +440,7 @@ private LoggableComponent<ReportingTask> createLoggableReportingTask() throws Re

final String taskName = taskComponent.getComponent().getClass().getSimpleName();
final ReportingInitializationContext config = new StandardReportingInitializationContext(identifier, taskName,
SchedulingStrategy.TIMER_DRIVEN, "1 min", taskComponent.getLogger(), serviceProvider, kerberosConfig, nodeTypeProvider);
SchedulingStrategy.TIMER_DRIVEN, "1 min", taskComponent.getLogger(), serviceProvider, kerberosConfig, nodeTypeProvider);

taskComponent.getComponent().initialize(config);

Expand Down
Expand Up @@ -16,6 +16,23 @@
*/
package org.apache.nifi.controller.service;

import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
Expand Down Expand Up @@ -51,24 +68,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class StandardControllerServiceNode extends AbstractComponentNode implements ControllerServiceNode {

private static final Logger LOG = LoggerFactory.getLogger(StandardControllerServiceNode.class);
Expand Down Expand Up @@ -288,7 +287,7 @@ public void verifyCanDelete() {

@Override
public void verifyCanDisable() {
verifyCanDisable(Collections.<ControllerServiceNode>emptySet());
verifyCanDisable(Collections.emptySet());
}

@Override
Expand Down Expand Up @@ -571,4 +570,21 @@ public void setVersionedComponentId(final String versionedComponentId) {
}
}
}

@Override
public String toString() {
String bundleCoordinate;
try {
bundleCoordinate = controllerServiceHolder.get().getBundleCoordinate().toString();
} catch (NullPointerException e) {
bundleCoordinate = "null";
}
return "StandardControllerServiceNode{" +
"controllerServiceHolder=" + bundleCoordinate +
", versionedComponentId=" + versionedComponentId +
", comment='" + comment + '\'' +
", processGroup=" + processGroup +
", active=" + active +
'}';
}
}
Expand Up @@ -16,23 +16,7 @@
*/
package org.apache.nifi.controller.service;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -51,8 +35,23 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ComponentNode;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardControllerServiceProvider implements ControllerServiceProvider {

Expand All @@ -64,6 +63,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
private final FlowManager flowManager;

private final ConcurrentMap<String, ControllerServiceNode> serviceCache = new ConcurrentHashMap<>();
private final String INVALID_CS_MESSAGE_SEGMENT = "cannot be enabled because it is not currently valid";

public StandardControllerServiceProvider(final FlowController flowController, final StandardProcessScheduler scheduler, final BulletinRepository bulletinRepo) {
this.flowController = flowController;
Expand Down Expand Up @@ -208,8 +208,14 @@ public void enableControllerServices(final Collection<ControllerServiceNode> ser
future.get(30, TimeUnit.SECONDS);
logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
} catch (final Exception e) {
logger.warn("Failed to enable service {}", controllerServiceNode, e);
// Nothing we can really do. Will attempt to enable this service anyway.
// If the service is not currently valid, there is no need to print the entire stacktrace
if (e.getLocalizedMessage().contains(INVALID_CS_MESSAGE_SEGMENT)) {
logger.warn("Failed to enable service {} because {}", controllerServiceNode, e.getLocalizedMessage());
} else {
// Print the whole stacktrace
logger.warn("Failed to enable service {}", controllerServiceNode, e);
// Nothing we can really do. Will attempt to enable this service anyway.
}
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -475,13 +481,13 @@ public boolean isControllerServiceEnabled(final ControllerService service) {
@Override
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? false : ControllerServiceState.ENABLED == node.getState();
return node != null && ControllerServiceState.ENABLED == node.getState();
}

@Override
public boolean isControllerServiceEnabling(final String serviceIdentifier) {
final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
return node == null ? false : ControllerServiceState.ENABLING == node.getState();
return node != null && ControllerServiceState.ENABLING == node.getState();
}

@Override
Expand Down

0 comments on commit 22563fc

Please sign in to comment.