implements Validator {
@@ -51,15 +51,22 @@ public boolean validate(String name, Object value) {
ReflectionUtils.forName((String) value, interfaceClass);
return true;
} catch (RuntimeException re) {
- log.warn("Setting value of '{}' is not '{}' : {}",
- name, interfaceClass.getName(), value, re);
+ log.warn()
+ .exception(re)
+ .attr("name", name)
+ .attr("interfaceClass", interfaceClass.getName())
+ .attr("value", value)
+ .log("Setting value does not match expected interface");
return false;
}
} else if (value instanceof Class) {
Class cls = (Class) value;
if (!interfaceClass.isAssignableFrom(cls)) {
- log.warn("Setting value of '{}' is not '{}' : {}",
- name, interfaceClass.getName(), cls.getName());
+ log.warn()
+ .attr("name", name)
+ .attr("interfaceClass", interfaceClass.getName())
+ .attr("value", cls.getName())
+ .log("Setting value does not match expected interface");
return false;
} else {
return true;
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
index d48d8e3613b..31cb8aef320 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java
@@ -32,8 +32,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
+import lombok.CustomLog;
import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.collections.GrowableMpScArrayConsumerBlockingQueue;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -44,7 +44,7 @@
* Tasks are executed in a safe manner: if there are exceptions they are logged and the executor will
* proceed with the next tasks.
*/
-@Slf4j
+@CustomLog
public class SingleThreadExecutor extends AbstractExecutorService implements ExecutorService, Runnable {
private final BlockingQueue queue;
private final Thread runner;
@@ -134,7 +134,7 @@ public void run() {
// Exit loop when interrupted
Thread.currentThread().interrupt();
} catch (Throwable t) {
- log.error("Exception in executor: {}", t.getMessage(), t);
+ log.error().exception(t).log("Exception in executor");
throw t;
} finally {
state = State.Terminated;
@@ -151,7 +151,7 @@ private boolean safeRunTask(Runnable r) {
return false;
} else {
tasksFailed.increment();
- log.error("Error while running task: {}", t.getMessage(), t);
+ log.error().exception(t).log("Error while running task");
}
} finally {
decrementPendingTaskCount(1);
@@ -289,9 +289,10 @@ private void decrementPendingTaskCount(int count) {
}
int currentPendingCount = pendingTaskCountUpdater.addAndGet(this, -count);
- if (log.isDebugEnabled()) {
- log.debug("Released {} task(s), current pending count: {}", count, currentPendingCount);
- }
+ log.debug()
+ .attr("released", count)
+ .attr("currentPendingCount", currentPendingCount)
+ .log("Released tasks");
}
private void reject() {
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadSafeScheduledExecutorService.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadSafeScheduledExecutorService.java
index d41f2411d3e..66471102dfa 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadSafeScheduledExecutorService.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadSafeScheduledExecutorService.java
@@ -23,9 +23,9 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
-@Slf4j
+@CustomLog
public class SingleThreadSafeScheduledExecutorService extends ScheduledThreadPoolExecutor
implements ScheduledExecutorService {
@@ -45,7 +45,10 @@ public void run() {
try {
task.run();
} catch (Throwable t) {
- log.warn("Unexpected throwable from task {}: {}", task.getClass(), t.getMessage(), t);
+ log.warn()
+ .exception(t)
+ .attr("taskClass", task.getClass())
+ .log("Unexpected throwable from task");
}
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheckImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheckImpl.java
index 70c6e23882a..a98aa5055f5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheckImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/datainteg/DataIntegrityCheckImpl.java
@@ -40,7 +40,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.LedgerStorage.StorageState;
@@ -54,7 +54,7 @@
/**
* An implementation of the DataIntegrityCheck interface.
*/
-@Slf4j
+@CustomLog
public class DataIntegrityCheckImpl implements DataIntegrityCheck {
private static final int MAX_INFLIGHT = 300;
private static final int MAX_ENTRIES_INFLIGHT = 3000;
@@ -95,11 +95,19 @@ public synchronized CompletableFuture runPreBootCheck(String reason) {
private CompletableFuture runPreBootSequence(String reason) {
String runId = UUID.randomUUID().toString();
- log.info("Event: {}, RunId: {}, Reason: {}", Events.PREBOOT_START, runId, reason);
+ log.info()
+ .attr("event", Events.PREBOOT_START)
+ .attr("runId", runId)
+ .attr("reason", reason)
+ .log("Preboot start");
try {
this.ledgerStorage.setStorageStateFlag(StorageState.NEEDS_INTEGRITY_CHECK);
} catch (IOException ioe) {
- log.error("Event: {}, RunId: {}", Events.PREBOOT_ERROR, runId, ioe);
+ log.error()
+ .exception(ioe)
+ .attr("event", Events.PREBOOT_ERROR)
+ .attr("runId", runId)
+ .log("Preboot error");
return FutureUtils.exception(ioe);
}
@@ -116,8 +124,12 @@ private CompletableFuture runPreBootSequence(String reason) {
ledgerStorage.setMasterKey(ledgerId, new byte[0]);
}
} catch (IOException ioe) {
- log.error("Event: {}, RunId: {}, LedgerId: {}",
- Events.ENSURE_LEDGER_ERROR, runId, ledgerId, ioe);
+ log.error()
+ .exception(ioe)
+ .attr("event", Events.ENSURE_LEDGER_ERROR)
+ .attr("runId", runId)
+ .attr("ledgerId", ledgerId)
+ .log("Ensure ledger error");
return FutureUtils.exception(ioe);
}
}
@@ -125,7 +137,11 @@ private CompletableFuture runPreBootSequence(String reason) {
})
.whenComplete((ignore, exception) -> {
if (exception != null) {
- log.error("Event: {}, runId: {}", Events.PREBOOT_ERROR, runId, exception);
+ log.error()
+ .exception(exception)
+ .attr("event", Events.PREBOOT_ERROR)
+ .attr("runId", runId)
+ .log("Preboot error");
promise.completeExceptionally(exception);
} else {
try {
@@ -133,11 +149,18 @@ private CompletableFuture runPreBootSequence(String reason) {
updateMetadataCache(ledgersCache);
- log.info("Event: {}, runId: {}, processed: {}",
- Events.PREBOOT_END, runId, ledgersCache.size());
+ log.info()
+ .attr("event", Events.PREBOOT_END)
+ .attr("runId", runId)
+ .attr("processed", ledgersCache.size())
+ .log("Preboot end");
promise.complete(null);
} catch (Throwable t) {
- log.error("Event: {}, runId: {}", Events.PREBOOT_ERROR, runId, t);
+ log.error()
+ .exception(t)
+ .attr("event", Events.PREBOOT_ERROR)
+ .attr("runId", runId)
+ .log("Preboot error");
promise.completeExceptionally(t);
}
}
@@ -155,12 +178,18 @@ public boolean needsFullCheck() throws IOException {
public CompletableFuture runFullCheck() {
String runId = UUID.randomUUID().toString();
- log.info("Event: {}, runId: {}", Events.FULL_CHECK_INIT, runId);
+ log.info()
+ .attr("event", Events.FULL_CHECK_INIT)
+ .attr("runId", runId)
+ .log("Full check init");
return getCachedOrReadMetadata(runId)
.thenCompose(
(ledgers) -> {
- log.info("Event: {}, runId: {}, ledgerCount: {}",
- Events.FULL_CHECK_START, runId, ledgers.size());
+ log.info()
+ .attr("event", Events.FULL_CHECK_START)
+ .attr("runId", runId)
+ .attr("ledgerCount", ledgers.size())
+ .log("Full check start");
return checkAndRecoverLedgers(ledgers, runId).thenApply((resolved) -> {
for (LedgerResult r : resolved) {
if (r.isMissing() || r.isOK()) {
@@ -175,19 +204,24 @@ public CompletableFuture runFullCheck() {
.map(r -> r.getThrowable()).findFirst();
if (firstError.isPresent()) {
- log.error("Event: {}, runId: {}, ok: {}"
- + ", error: {}, missing: {}, ledgersToRetry: {}",
- Events.FULL_CHECK_END, runId,
- resolved.stream().filter(r -> r.isOK()).count(),
- resolved.stream().filter(r -> r.isError()).count(),
- resolved.stream().filter(r -> r.isMissing()).count(),
- ledgers.size(), firstError.get());
+ log.error()
+ .exception(firstError.get())
+ .attr("event", Events.FULL_CHECK_END)
+ .attr("runId", runId)
+ .attr("ok", resolved.stream().filter(r -> r.isOK()).count())
+ .attr("error", resolved.stream().filter(r -> r.isError()).count())
+ .attr("missing", resolved.stream().filter(r -> r.isMissing()).count())
+ .attr("ledgersToRetry", ledgers.size())
+ .log("Full check end with errors");
} else {
- log.info("Event: {}, runId: {}, ok: {}, error: 0, missing: {}, ledgersToRetry: {}",
- Events.FULL_CHECK_END, runId,
- resolved.stream().filter(r -> r.isOK()).count(),
- resolved.stream().filter(r -> r.isMissing()).count(),
- ledgers.size());
+ log.info()
+ .attr("event", Events.FULL_CHECK_END)
+ .attr("runId", runId)
+ .attr("ok", resolved.stream().filter(r -> r.isOK()).count())
+ .attr("error", 0)
+ .attr("missing", resolved.stream().filter(r -> r.isMissing()).count())
+ .attr("ledgersToRetry", ledgers.size())
+ .log("Full check end");
}
return ledgers;
});
@@ -198,16 +232,26 @@ public CompletableFuture runFullCheck() {
try {
this.ledgerStorage.flush();
if (ledgers.isEmpty()) {
- log.info("Event: {}, runId: {}", Events.CLEAR_INTEGCHECK_FLAG, runId);
+ log.info()
+ .attr("event", Events.CLEAR_INTEGCHECK_FLAG)
+ .attr("runId", runId)
+ .log("Clearing integrity check flag");
this.ledgerStorage.clearStorageStateFlag(
StorageState.NEEDS_INTEGRITY_CHECK);
}
// not really needed as we are modifying the map in place
updateMetadataCache(ledgers);
- log.info("Event: {}, runId: {}", Events.FULL_CHECK_COMPLETE, runId);
+ log.info()
+ .attr("event", Events.FULL_CHECK_COMPLETE)
+ .attr("runId", runId)
+ .log("Full check complete");
promise.complete(null);
} catch (IOException ioe) {
- log.error("Event: {}, runId: {}", Events.FULL_CHECK_ERROR, runId, ioe);
+ log.error()
+ .exception(ioe)
+ .attr("event", Events.FULL_CHECK_ERROR)
+ .attr("runId", runId)
+ .log("Full check error");
promise.completeExceptionally(ioe);
}
return promise;
@@ -221,11 +265,17 @@ void updateMetadataCache(Map ledgers) {
CompletableFuture