Skip to content
Permalink
Browse files
Add initial version of automatic troubleshooter (#3296)
* GOBBLIN-1457 Add initial version of automatic troubleshooter

Gobblin users, developers and SREs spend a significant amount of time
troubleshooting issues with their jobs and flows. Previously,
troubleshooting required looking into logs across multiple systems,
like Hadoop mappers, Azkaban jobs and Gobblin service. Log messages
required knowledge of how Gobblin works internally, and users frequently
had to involve Gobblin developers to understand them.

This is an initial commit for automatic Gobblin troubleshooter. The
current implementation will intercept logs from Mappers and Azkaban job
and compile a list of issues based on them. This list will be filtered
and prioritized. Then it will be displayed to the user in logs, and
forwarded as an event for consumption by Gobblin service.

Future commits will add issue reporting in Gobblin service, as well
as improve on issue filtering & refining.

For more information, check the docs on AutomaticTroubleshooter class,
and review the design doc:
https://docs.google.com/document/d/1BAYr-dHtdauX6Uf13VP3-IlHkyUQ5V64PjFIg9MtpaA/edit#

* Refactored troubleshooter to remove log4j dependency from runtime

* Addressed code review comments
  • Loading branch information
aplex committed Jul 1, 2021
1 parent 4b1d57f commit d5a85e35cff176d9bfa833c668a395785357e38c
Showing 39 changed files with 2,159 additions and 43 deletions.
@@ -95,6 +95,7 @@ dependencies {
compile project(path: ':gobblin-restli:gobblin-throttling-service:gobblin-throttling-service-api', configuration: 'restClient')
compile project(':gobblin-restli:gobblin-throttling-service:gobblin-throttling-service-client')
compile project(':gobblin-restli:gobblin-throttling-service:gobblin-throttling-service-server')
compile project(':gobblin-modules:gobblin-troubleshooter')
compile project(':gobblin-tunnel')
compile project(':gobblin-utility')
compile project(':gobblin-yarn')
@@ -17,11 +17,11 @@

package org.apache.gobblin.configuration;

import com.google.common.base.Charsets;

import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Charsets;


/**
* A central place for all Gobblin configuration property keys.
@@ -252,6 +252,7 @@ public class ConfigurationKeys {
public static final String TASK_ATTEMPT_ID_KEY = "task.AttemptId";
public static final String JOB_CONFIG_FILE_PATH_KEY = "job.config.path";
public static final String TASK_FAILURE_EXCEPTION_KEY = "task.failure.exception";
public static final String TASK_ISSUES_KEY = "task.issues";
public static final String JOB_FAILURE_EXCEPTION_KEY = "job.failure.exception";
public static final String TASK_RETRIES_KEY = "task.retries";
public static final String TASK_IGNORE_CLOSE_FAILURES = "task.ignoreCloseFailures";
@@ -1078,4 +1079,27 @@ public class ConfigurationKeys {
public static final String STAGING_DIR_DEFAULT_SUFFIX = "/" + TMP_DIR + "/taskStaging";
public static final String OUTPUT_DIR_DEFAULT_SUFFIX = "/" + TMP_DIR + "/taskOutput";
public static final String ROW_LEVEL_ERR_FILE_DEFAULT_SUFFIX = "/err";


/**
* Troubleshooter configuration
*/

/**
* Disables all troubleshooter functions
* */
public static final String TROUBLESHOOTER_DISABLED = "gobblin.troubleshooter.disabled";

/**
* Disables reporting troubleshooter issues as GobblinTrackingEvents
* */
public static final String TROUBLESHOOTER_DISABLE_EVENT_REPORTING = "gobblin.troubleshooter.disableEventReporting";

/**
* The maximum number of issues that In-memory troubleshooter repository will keep.
*
* This setting can control memory usage of the troubleshooter.
* */
public static final String TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE = "gobblin.troubleshooter.inMemoryIssueRepository.maxSize";
public static final int DEFAULT_TROUBLESHOOTER_IN_MEMORY_ISSUE_REPOSITORY_MAX_SIZE = 100;
}
@@ -177,7 +177,8 @@ public GobblinHelixJobLauncher (Properties jobProps,
this.jobContext.getJobState(),
this.eventBus,
this.stateStores.getTaskStateStore(),
this.outputTaskStateDir);
this.outputTaskStateDir,
this.getIssueRepository());

this.helixMetrics = helixMetrics;
startCancellationExecutor();
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: 'java'

dependencies {
implementation project(":gobblin-api")
implementation project(":gobblin-runtime")

implementation externalDependency.log4j

testImplementation externalDependency.assertj
testImplementation externalDependency.mockito
testImplementation externalDependency.testng
}

test {
workingDir rootProject.rootDir
}

ext.classification="library"
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.troubleshooter;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LocationInfo;
import org.apache.log4j.spi.LoggingEvent;

import javax.annotation.concurrent.ThreadSafe;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.runtime.ThrowableWithErrorCode;
import org.apache.gobblin.runtime.troubleshooter.Issue;
import org.apache.gobblin.runtime.troubleshooter.IssueRepository;
import org.apache.gobblin.runtime.troubleshooter.IssueSeverity;
import org.apache.gobblin.runtime.troubleshooter.TroubleshooterException;


/**
* Collects messages from log4j and converts them into issues that are used in {@link AutomaticTroubleshooter}.
*/
@Slf4j
@ThreadSafe
public class AutoTroubleshooterLogAppender extends AppenderSkeleton {
private static final int AUTO_GENERATED_HASH_LENGTH = 6;
private static final String AUTO_GENERATED_HASH_PREFIX = "T";

private final IssueRepository repository;

private final AtomicBoolean reportedRepositoryError = new AtomicBoolean(false);
private final AtomicInteger processedEventCount = new AtomicInteger();

public AutoTroubleshooterLogAppender(IssueRepository issueRepository) {
this.repository = Objects.requireNonNull(issueRepository);
}

private static String getHash(String text) {
return AUTO_GENERATED_HASH_PREFIX + DigestUtils.sha256Hex(text).substring(0, AUTO_GENERATED_HASH_LENGTH)
.toUpperCase();
}

public int getProcessedEventCount() {
return processedEventCount.get();
}

@Override
protected void append(LoggingEvent event) {
processedEventCount.incrementAndGet();

Issue issue = convertToIssue(event);

try {
repository.put(issue);
} catch (TroubleshooterException e) {
if (reportedRepositoryError.compareAndSet(false, true)) {
log.warn("Failed to save the issue to the repository", e);
}
}
}

private Issue convertToIssue(LoggingEvent event) {
Issue.IssueBuilder issueBuilder =
Issue.builder().time(ZonedDateTime.ofInstant(Instant.ofEpochMilli(event.getTimeStamp()), ZoneOffset.UTC))
.severity(convert(event.getLevel())).code(getIssueCode(event)).sourceClass(event.getLoggerName());

if (event.getThrowableInformation() != null) {
Throwable throwable = event.getThrowableInformation().getThrowable();
issueBuilder.details(ExceptionUtils.getStackTrace(throwable));

String summarizedException =
StringUtils.substringBefore(ExceptionUtils.getRootCauseMessage(throwable), System.lineSeparator());
issueBuilder.summary(summarizedException + " | " + event.getRenderedMessage());
} else {
issueBuilder.summary(event.getRenderedMessage());
}

return issueBuilder.build();
}

private String getIssueCode(LoggingEvent event) {
if (event.getThrowableInformation() != null) {
return getIssueCode(event.getThrowableInformation().getThrowable());
}

LocationInfo locationInformation = event.getLocationInformation();

if (locationInformation.fullInfo != null) {
String locationInCode = locationInformation.getClassName() + locationInformation.getLineNumber();
return getHash(locationInCode);
} else {
return getHash(event.getLoggerName() + event.getMessage().toString());
}
}

private String getIssueCode(Throwable throwable) {
if (throwable instanceof ThrowableWithErrorCode) {
return ((ThrowableWithErrorCode) throwable).getErrorCode();
}

/*
* Ideally, each exception should have a unique machine-readable error code. Then we can easily group them together
* and remove duplicates. However, it’s not feasible to add error codes to the large legacy codebase overnight, so
* we generate them automatically.
*
* Good error codes should identify one specific problem, so they don’t always map to exception types.
* For example “FileNotFoundException” can mean that a user's file is not found, or some config file that job
* expects was not found, or credentials file is missing, and so on.
*
* Exception messages can have path names, job ids, and other unpredictable variable parts.
* So, even if the problem is exactly the same, the messages could be different.
*
* We pick an option to generate an error code as a hash of exception type and stack trace. This will produce
* a unique error code of the situation. However, when a codebase is refactored, stacktraces can change.
* As a result, such automatic error codes can be different between application versions.
* This should be fine within a single job, but it can affect system-wide reports that process data from
* multiple application versions.
* */

return getHash(throwable.getClass().toString() + ExceptionUtils.getStackTrace(throwable));
}

private IssueSeverity convert(Level level) {
if (level == Level.TRACE || level == Level.DEBUG) {
return IssueSeverity.DEBUG;
} else if (level == Level.INFO) {
return IssueSeverity.INFO;
} else if (level == Level.WARN) {
return IssueSeverity.WARN;
} else if (level == Level.ERROR) {
return IssueSeverity.ERROR;
} else if (level == Level.FATAL) {
return IssueSeverity.FATAL;
}

return IssueSeverity.DEBUG;
}

@Override
public void close() {

}

@Override
public boolean requiresLayout() {
return false;
}
}

0 comments on commit d5a85e3

Please sign in to comment.