Skip to content

Commit

Permalink
[Kernel] Use SLF4J for logging in kernel-api
Browse files Browse the repository at this point in the history
- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

Resolves #2230

Uses SLF4J for logging in kernel-API. See #2230 for the decision doc on this.

Also adds log4j for logging in tests.

Temporarily changed log level=DEBUG and ran tests in both kernelApi and kernelDefaults and confirmed that logs were outputted.

Closes #2305

Signed-off-by: Allison Portis <allison.portis@databricks.com>
GitOrigin-RevId: eed1be726887a2faac5c3a22ad3e5f6fdad82b49
  • Loading branch information
allisonport-db committed Nov 20, 2023
1 parent 35e5d69 commit 13f7fbc
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 64 deletions.
7 changes: 5 additions & 2 deletions build.sbt
Expand Up @@ -222,11 +222,13 @@ lazy val kernelApi = (project in file("kernel/kernel-api"))
Test / javaOptions ++= Seq("-ea"),
libraryDependencies ++= Seq(
"org.roaringbitmap" % "RoaringBitmap" % "0.9.25",
"org.slf4j" % "slf4j-api" % "2.0.9",

"com.fasterxml.jackson.core" % "jackson-databind" % "2.13.5" % "test",
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test"
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.slf4j" % "slf4j-log4j12" % "2.0.9" % "test"
),
javaCheckstyleSettings("kernel/dev/checkstyle.xml"),
// Unidoc settings
Expand All @@ -251,7 +253,8 @@ lazy val kernelDefaults = (project in file("kernel/kernel-defaults"))
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",
"junit" % "junit" % "4.13" % "test",
"commons-io" % "commons-io" % "2.8.0" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test"
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.slf4j" % "slf4j-log4j12" % "2.0.9" % "test"
),
javaCheckstyleSettings("kernel/dev/checkstyle.xml"),
// Unidoc settings
Expand Down
Expand Up @@ -25,9 +25,8 @@

import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.snapshot.SnapshotManager;
import io.delta.kernel.internal.util.Logging;

public class TableImpl implements Table, Logging {
public class TableImpl implements Table {
public static Table forPath(TableClient tableClient, String path)
throws TableNotFoundException {
// Resolve the path to fully qualified table path using the `TableClient` APIs
Expand Down
Expand Up @@ -36,7 +36,6 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.Lazy;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.util.Logging;
import io.delta.kernel.internal.util.Tuple2;

/**
Expand All @@ -56,7 +55,7 @@
* - {@link #getAddFilesAsColumnarBatches}: return all active (not tombstoned) AddFiles as
* {@link ColumnarBatch}s
*/
public class LogReplay implements Logging {
public class LogReplay {

/** Read schema when searching for the latest Protocol and Metadata. */
public static final StructType PROTOCOL_METADATA_READ_SCHEMA = new StructType()
Expand Down
Expand Up @@ -22,6 +22,9 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.delta.kernel.Snapshot;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
Expand All @@ -35,15 +38,15 @@
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.Logging;
import io.delta.kernel.internal.util.Tuple2;
import static io.delta.kernel.internal.fs.Path.getName;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;

public class SnapshotManager
implements Logging {
public class SnapshotManager {
public SnapshotManager() {}

private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);

/**
* - Verify the versions are contiguous.
* - Verify the versions start with `expectedStartVersion` if it's specified.
Expand Down Expand Up @@ -98,7 +101,7 @@ private CloseableIterator<FileStatus> listFrom(
TableClient tableClient,
long startVersion)
throws IOException {
logDebug(String.format("startVersion: %s", startVersion));
logger.debug("startVersion: {}", startVersion);
return tableClient
.getFileSystemClient()
.listFrom(FileNames.listingPrefix(logPath, startVersion));
Expand Down Expand Up @@ -158,7 +161,7 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
TableClient tableClient,
long startVersion,
Optional<Long> versionToLoad) {
logDebug(String.format("startVersion: %s, versionToLoad: %s", startVersion, versionToLoad));
logger.debug("startVersion: {}, versionToLoad: {}", startVersion, versionToLoad);

return listFromOrNone(
logPath,
Expand Down Expand Up @@ -225,9 +228,9 @@ private SnapshotImpl createSnapshot(
TableClient tableClient) {
final String startingFromStr = initSegment
.checkpointVersionOpt
.map(v -> String.format(" starting from checkpoint version %s.", v))
.map(v -> String.format("starting from checkpoint version %s.", v))
.orElse(".");
logInfo(() -> String.format("Loading version %s%s", initSegment.version, startingFromStr));
logger.info("Loading version {} {}", initSegment.version, startingFromStr);

return new SnapshotImpl(
logPath,
Expand Down Expand Up @@ -324,13 +327,13 @@ protected Optional<LogSegment> getLogSegmentForVersion(
// recursive call to [[getLogSegmentForVersion]] below (same as before the refactor).
newFiles = Collections.emptyList();
}
logDebug(() ->
logger.atDebug().setMessage(() ->
String.format(
"newFiles: %s",
Arrays.toString(newFiles.stream()
.map(x -> new Path(x.getPath()).getName()).toArray())
)
);
).log();

if (newFiles.isEmpty() && !startCheckpointOpt.isPresent()) {
// We can't construct a snapshot because the directory contained no usable commit
Expand All @@ -356,31 +359,32 @@ protected Optional<LogSegment> getLogSegmentForVersion(
final List<FileStatus> checkpoints = checkpointsAndDeltas._1;
final List<FileStatus> deltas = checkpointsAndDeltas._2;

logDebug(() ->
logger.atDebug().setMessage(() ->
String.format(
"\ncheckpoints: %s\ndeltas: %s",
Arrays.toString(checkpoints.stream().map(
x -> new Path(x.getPath()).getName()).toArray()),
Arrays.toString(deltas.stream().map(
x -> new Path(x.getPath()).getName()).toArray())
)
);
).log();

// Find the latest checkpoint in the listing that is not older than the versionToLoad
final CheckpointInstance maxCheckpoint = versionToLoadOpt.map(CheckpointInstance::new)
.orElse(CheckpointInstance.MAX_VALUE);
logDebug(String.format("lastCheckpoint: %s", maxCheckpoint));
logger.debug("lastCheckpoint: {}", maxCheckpoint);

final List<CheckpointInstance> checkpointFiles = checkpoints
.stream()
.map(f -> new CheckpointInstance(f.getPath()))
.collect(Collectors.toList());
logDebug(() ->
String.format("checkpointFiles: %s", Arrays.toString(checkpointFiles.toArray())));
logger.atDebug().setMessage(() ->
String.format("checkpointFiles: %s", Arrays.toString(checkpointFiles.toArray()))
).log();

final Optional<CheckpointInstance> newCheckpointOpt =
Checkpointer.getLatestCompleteCheckpointFromList(checkpointFiles, maxCheckpoint);
logDebug(String.format("newCheckpointOpt: %s", newCheckpointOpt));
logger.debug("newCheckpointOpt: {}", newCheckpointOpt);

final long newCheckpointVersion = newCheckpointOpt
.map(c -> c.version)
Expand Down Expand Up @@ -415,7 +419,7 @@ protected Optional<LogSegment> getLogSegmentForVersion(

return -1L;
});
logDebug(String.format("newCheckpointVersion: %s", newCheckpointVersion));
logger.debug("newCheckpointVersion: {}", newCheckpointVersion);

// TODO: we can calculate deltasAfterCheckpoint and deltaVersions more efficiently
// If there is a new checkpoint, start new lineage there. If `newCheckpointVersion` is -1,
Expand All @@ -427,23 +431,24 @@ protected Optional<LogSegment> getLogSegmentForVersion(
new Path(fileStatus.getPath())) > newCheckpointVersion)
.collect(Collectors.toList());

logDebug(() ->
logger.atDebug().setMessage(() ->
String.format(
"deltasAfterCheckpoint: %s",
Arrays.toString(deltasAfterCheckpoint.stream().map(
x -> new Path(x.getPath()).getName()).toArray())
)
);
).log();

// todo again naming confusing (specify after checkpoint?)
final LinkedList<Long> deltaVersionsAfterCheckpoint = deltasAfterCheckpoint
.stream()
.map(fileStatus -> FileNames.deltaVersion(new Path(fileStatus.getPath())))
.collect(Collectors.toCollection(LinkedList::new));

logDebug(() ->
logger.atDebug().setMessage(() ->
String.format("deltaVersions: %s",
Arrays.toString(deltaVersionsAfterCheckpoint.toArray())));
Arrays.toString(deltaVersionsAfterCheckpoint.toArray()))
).log();

// We may just be getting a checkpoint file after the filtering
if (!deltaVersionsAfterCheckpoint.isEmpty()) {
Expand Down

This file was deleted.

44 changes: 44 additions & 0 deletions kernel/kernel-api/src/test/resources/log4j.properties
@@ -0,0 +1,44 @@
#
# Copyright (2023) The Delta Lake Project Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Set everything to be logged to the file target/unit-tests.log
test.appender=file
log4j.rootCategory=info, ${test.appender}
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

# Tests that launch java subprocesses can set the "test.appender" system property to
# "console" to avoid having the child process's logs overwrite the unit test's
# log file.
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%t: %m%n
44 changes: 44 additions & 0 deletions kernel/kernel-defaults/src/test/resources/log4j.properties
@@ -0,0 +1,44 @@
#
# Copyright (2023) The Delta Lake Project Authors.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Set everything to be logged to the file target/unit-tests.log
test.appender=file
log4j.rootCategory=info, ${test.appender}
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

# Tests that launch java subprocesses can set the "test.appender" system property to
# "console" to avoid having the child process's logs overwrite the unit test's
# log file.
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%t: %m%n

0 comments on commit 13f7fbc

Please sign in to comment.