Skip to content

Commit

Permalink
[FLINK-34417] Log Job ID via MDC
Browse files Browse the repository at this point in the history
  • Loading branch information
rkhachatryan committed Mar 8, 2024
1 parent 4171b98 commit d6a4eb9
Show file tree
Hide file tree
Showing 31 changed files with 1,148 additions and 258 deletions.
15 changes: 15 additions & 0 deletions docs/content.zh/docs/deployment/advanced/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,21 @@ Flink 中的日志记录是使用 [SLF4J](http://www.slf4j.org/) 日志接口实

<a name="configuring-log4j-2"></a>

### Structured logging

Flink adds the following fields to [MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log messages (experimental feature):
- Job ID
- key: `flink-job-id`
- format: string
- length 32

This is most useful in environments with structured logging and allows you to quickly filter the relevant logs.

The MDC is propagated by slf4j to the logging backend which usually adds it to the log records automatically (e.g. in [log4j json layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html)).
Alternatively, it can be configured explicitly - [log4j pattern layout](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html) might look like this:

`[%-32X{flink-job-id}] %c{0} %m%n`.

## 配置 Log4j 2

Log4j 2 是通过 property 配置文件进行配置的。
Expand Down
14 changes: 14 additions & 0 deletions docs/content/docs/deployment/advanced/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ This allows you to use any logging framework that supports SLF4J, without having
By default, [Log4j 2](https://logging.apache.org/log4j/2.x/index.html) is used as the underlying logging framework.


### Structured logging

Flink adds the following fields to [MDC](https://www.slf4j.org/api/org/slf4j/MDC.html) of most of the relevant log messages (experimental feature):
- Job ID
- key: `flink-job-id`
- format: string
- length 32

This is most useful in environments with structured logging and allows you to quickly filter the relevant logs.

The MDC is propagated by slf4j to the logging backend which usually adds it to the log records automatically (e.g. in [log4j json layout](https://logging.apache.org/log4j/2.x/manual/json-template-layout.html)).
Alternatively, it can be configured explicitly - [log4j pattern layout](https://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/PatternLayout.html) might look like this:

`[%-32X{flink-job-id}] %c{0} %m%n`.

## Configuring Log4j 2

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;

import static org.apache.flink.util.Preconditions.checkNotNull;

class MdcAwareExecutor<T extends Executor> implements Executor {
protected final Map<String, String> contextData;
protected final T delegate;

protected MdcAwareExecutor(T delegate, Map<String, String> contextData) {
this.delegate = checkNotNull(delegate);
this.contextData = Collections.unmodifiableMap(checkNotNull(contextData));
}

public void execute(Runnable command) {
delegate.execute(MdcUtils.wrapRunnable(contextData, command));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.apache.flink.util.MdcUtils.wrapCallable;
import static org.apache.flink.util.MdcUtils.wrapRunnable;

class MdcAwareExecutorService<S extends ExecutorService> extends MdcAwareExecutor<S>
implements ExecutorService {

public MdcAwareExecutorService(S delegate, Map<String, String> contextData) {
super(delegate, contextData);
}

@Override
public void shutdown() {
delegate.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}

@Override
public boolean isShutdown() {
return delegate.isShutdown();
}

@Override
public boolean isTerminated() {
return delegate.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return delegate.submit(wrapCallable(contextData, task));
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return delegate.submit(wrapRunnable(contextData, task), result);
}

@Override
public Future<?> submit(Runnable task) {
return delegate.submit(wrapRunnable(contextData, task));
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return delegate.invokeAll(wrapCallables(tasks));
}

@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return delegate.invokeAll(wrapCallables(tasks), timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return delegate.invokeAny(wrapCallables(tasks));
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(wrapCallables(tasks), timeout, unit);
}

private <T> List<Callable<T>> wrapCallables(Collection<? extends Callable<T>> tasks) {
List<Callable<T>> list = new ArrayList<>(tasks.size());
for (Callable<T> task : tasks) {
list.add(wrapCallable(contextData, task));
}
return list;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.util.MdcUtils.wrapCallable;
import static org.apache.flink.util.MdcUtils.wrapRunnable;

class MdcAwareScheduledExecutorService extends MdcAwareExecutorService<ScheduledExecutorService>
implements ScheduledExecutorService {

public MdcAwareScheduledExecutorService(
ScheduledExecutorService delegate, Map<String, String> contextData) {
super(delegate, contextData);
}

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return delegate.schedule(wrapRunnable(contextData, command), delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return delegate.schedule(wrapCallable(contextData, callable), delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return delegate.scheduleAtFixedRate(
wrapRunnable(contextData, command), initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return delegate.scheduleWithFixedDelay(
wrapRunnable(contextData, command), initialDelay, delay, unit);
}
}
112 changes: 112 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/MdcUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.util;

import org.apache.flink.api.common.JobID;

import org.slf4j.MDC;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.flink.util.Preconditions.checkArgument;

/** Utility class to manage common Flink attributes in {@link MDC} (only {@link JobID} ATM). */
public class MdcUtils {

public static final String JOB_ID = "flink-job-id";

/**
* Replace MDC contents with the provided one and return a closeable object that can be used to
* restore the original MDC.
*
* @param context to put into MDC
*/
public static MdcCloseable withContext(Map<String, String> context) {
final Map<String, String> orig = MDC.getCopyOfContextMap();
MDC.setContextMap(context);
return () -> MDC.setContextMap(orig);
}

/** {@link AutoCloseable } that restores the {@link MDC} contents on close. */
public interface MdcCloseable extends AutoCloseable {
@Override
void close();
}

/**
* Wrap the given {@link Runnable} so that the given data is added to {@link MDC} before its
* execution and removed afterward.
*/
public static Runnable wrapRunnable(Map<String, String> contextData, Runnable command) {
return () -> {
try (MdcCloseable ctx = withContext(contextData)) {
command.run();
}
};
}

/**
* Wrap the given {@link Callable} so that the given data is added to {@link MDC} before its
* execution and removed afterward.
*/
public static <T> Callable<T> wrapCallable(
Map<String, String> contextData, Callable<T> command) {
return () -> {
try (MdcCloseable ctx = withContext(contextData)) {
return command.call();
}
};
}

/**
* Wrap the given {@link Executor} so that the given {@link JobID} is added before it executes
* any submitted commands and removed afterward.
*/
public static Executor scopeToJob(JobID jobID, Executor executor) {
checkArgument(!(executor instanceof MdcAwareExecutor));
return new MdcAwareExecutor<>(executor, asContextData(jobID));
}

/**
* Wrap the given {@link ExecutorService} so that the given {@link JobID} is added before it
* executes any submitted commands and removed afterward.
*/
public static ExecutorService scopeToJob(JobID jobID, ExecutorService delegate) {
checkArgument(!(delegate instanceof MdcAwareExecutorService));
return new MdcAwareExecutorService<>(delegate, asContextData(jobID));
}

/**
* Wrap the given {@link ScheduledExecutorService} so that the given {@link JobID} is added
* before it executes any submitted commands and removed afterward.
*/
public static ScheduledExecutorService scopeToJob(JobID jobID, ScheduledExecutorService ses) {
checkArgument(!(ses instanceof MdcAwareScheduledExecutorService));
return new MdcAwareScheduledExecutorService(ses, asContextData(jobID));
}

public static Map<String, String> asContextData(JobID jobID) {
return Collections.singletonMap(JOB_ID, jobID.toHexString());
}
}
Loading

0 comments on commit d6a4eb9

Please sign in to comment.