Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-4344: Improve bulletin messages with exception details. #5093

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,8 +34,6 @@

public class StandardLogRepository implements LogRepository {

public static final int DEFAULT_MAX_CAPACITY_PER_LEVEL = 10;

private final Map<LogLevel, Collection<LogObserver>> observers = new HashMap<>();
private final Map<String, LogObserver> observerLookup = new HashMap<>();

Expand Down Expand Up @@ -82,7 +80,7 @@ public void addLogMessage(final LogLevel level, final String format, final Objec
addLogMessage(level, formattedMessage, t);
}

private void replaceThrowablesWithMessage(Object[] params) {
private void replaceThrowablesWithMessage(final Object[] params) {
for (int i = 0; i < params.length; i++) {
if(params[i] instanceof Throwable) {
params[i] = ((Throwable) params[i]).getLocalizedMessage();
Expand Down
Expand Up @@ -24,8 +24,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.stream.Collectors;

public class SimpleProcessLogger implements ComponentLog {

public static final String NEW_LINE_ARROW = "\u21B3";
public static final String CAUSES = NEW_LINE_ARROW + " causes: ";

private final Logger logger;
private final LogRepository logRepository;
private final Object component;
Expand Down Expand Up @@ -58,7 +64,7 @@ public void warn(String msg, final Throwable t) {
}

msg = "{} " + msg;
final Object[] os = {component, t.toString(), t};
final Object[] os = {component, getCauses(t), t};
logger.warn(msg, os);
logRepository.addLogMessage(LogLevel.WARN, msg, os, t);
}
Expand Down Expand Up @@ -110,7 +116,7 @@ public void trace(String msg, Throwable t) {
}

msg = "{} " + msg;
final Object[] os = {component, t.toString(), t};
final Object[] os = {component, getCauses(t), t};
logger.trace(msg, os);
logRepository.addLogMessage(LogLevel.TRACE, msg, os, t);
}
Expand Down Expand Up @@ -184,7 +190,7 @@ public void info(String msg, Throwable t) {
}

msg = "{} " + msg;
final Object[] os = {component, t.toString()};
final Object[] os = {component, getCauses(t)};

logger.info(msg, os);
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -245,12 +251,12 @@ public void error(String msg, Throwable t) {

if (t == null) {
msg = "{} " + msg;
final Object[] os = new Object[] {component};
final Object[] os = new Object[]{component};
logger.error(msg, os);
logRepository.addLogMessage(LogLevel.ERROR, msg, os);
} else {
msg = "{} " + msg + ": {}";
final Object[] os = new Object[] {component, t.toString(), t};
final Object[] os = new Object[]{component, getCauses(t), t};
logger.error(msg, os);
logRepository.addLogMessage(LogLevel.ERROR, msg, os, t);
}
Expand Down Expand Up @@ -301,7 +307,7 @@ private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t,
modifiedArgs = new Object[os.length + 3];
modifiedArgs[0] = component.toString();
System.arraycopy(os, 0, modifiedArgs, 1, os.length);
modifiedArgs[modifiedArgs.length - 2] = t.toString();
modifiedArgs[modifiedArgs.length - 2] = getCauses(t);
modifiedArgs[modifiedArgs.length - 1] = t;
}

Expand Down Expand Up @@ -448,4 +454,12 @@ public void log(LogLevel level, String msg, Object[] os, Throwable t) {
}
}

private String getCauses(final Throwable throwable) {
Lehel44 marked this conversation as resolved.
Show resolved Hide resolved
final LinkedList<String> causes = new LinkedList<>();
for (Throwable t = throwable; t != null; t = t.getCause()) {
causes.push(t.toString());
}
return causes.stream().collect(Collectors.joining(System.lineSeparator() + CAUSES));
}

}
Expand Up @@ -16,6 +16,15 @@
*/
package org.apache.nifi.processor;

import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.reporting.ReportingTask;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;

import java.lang.reflect.Field;

import static org.apache.nifi.processor.SimpleProcessLogger.NEW_LINE_ARROW;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -24,16 +33,13 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.lang.reflect.Field;
public class TestSimpleProcessLogger {

import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.reporting.ReportingTask;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
private static final String EXPECTED_CAUSES = "java.lang.RuntimeException: third" + System.lineSeparator() +
NEW_LINE_ARROW + " causes: java.lang.RuntimeException: second" + System.lineSeparator() +
NEW_LINE_ARROW + " causes: java.lang.RuntimeException: first";

public class TestSimpleProcessLogger {
private final Exception e = new RuntimeException("intentional");
private final Exception e = new RuntimeException("first", new RuntimeException("second", new RuntimeException("third")));

private ReportingTask task;

Expand Down Expand Up @@ -68,7 +74,7 @@ public void before() {
@Test
public void validateDelegateLoggerReceivesThrowableToStringOnError() {
componentLog.error("Hello {}", e);
verify(logger, times(1)).error(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).error(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
}

@Test
Expand All @@ -80,24 +86,24 @@ public void validateDelegateLoggerReceivesThrowableToStringOnInfo() {
@Test
public void validateDelegateLoggerReceivesThrowableToStringOnTrace() {
componentLog.trace("Hello {}", e);
verify(logger, times(1)).trace(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).trace(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
}

@Test
public void validateDelegateLoggerReceivesThrowableToStringOnWarn() {
componentLog.warn("Hello {}", e);
verify(logger, times(1)).warn(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).warn(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
}

@Test
public void validateDelegateLoggerReceivesThrowableToStringOnLogWithLevel() {
componentLog.log(LogLevel.WARN, "Hello {}", e);
verify(logger, times(1)).warn(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).warn(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
componentLog.log(LogLevel.ERROR, "Hello {}", e);
verify(logger, times(1)).error(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).error(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
componentLog.log(LogLevel.INFO, "Hello {}", e);
verify(logger, times(1)).info(anyString(), eq(e));
componentLog.log(LogLevel.TRACE, "Hello {}", e);
verify(logger, times(1)).trace(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).trace(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
}
}
Expand Up @@ -251,8 +251,8 @@ public InvocationResult invoke() {
} catch (final Throwable t) {
// Use ComponentLog to log the event so that a bulletin will be created for this processor
final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());
procLog.error("{} failed to process session due to {}; Processor Administratively Yielded for {}",
new Object[] {connectable.getRunnableComponent(), t, schedulingAgent.getAdministrativeYieldDuration()}, t);
procLog.error("Failed to process session due to {}; Processor Administratively Yielded for {}",
new Object[] {t, schedulingAgent.getAdministrativeYieldDuration()}, t);
Lehel44 marked this conversation as resolved.
Show resolved Hide resolved
logger.warn("Administratively Yielding {} due to uncaught Exception: {}", connectable.getRunnableComponent(), t.toString(), t);

connectable.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
Expand Down