Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3968: fsync the parent directory of a segment file when the file is created #10405

Merged
merged 27 commits into from
Apr 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
459facb
[KAFKA-3968] fsync the parent directory of a segment file when the fi…
ccding Mar 25, 2021
b4c8284
move import
ccding Mar 26, 2021
b260169
address comments (except the topic partition one)
ccding Mar 29, 2021
ba086e9
remove import
ccding Mar 29, 2021
2a19e0e
reuse the function in utils.java
ccding Mar 30, 2021
40a1abe
simplify logic
ccding Mar 30, 2021
1ac80b6
default changeFileSuffixes flush to true
ccding Mar 30, 2021
09cac0b
flush when mkdirs
ccding Mar 30, 2021
5be95aa
revert accidential change
ccding Mar 30, 2021
c9448c8
atomicMoveWithFallback
ccding Mar 30, 2021
daeb698
no flush parent dir in test
ccding Mar 30, 2021
0d4800b
check null pointer
ccding Mar 31, 2021
95a6c3f
fix unit test error
ccding Mar 31, 2021
8c859f3
set flag after flush
ccding Apr 1, 2021
fdc1faa
disable flushing on renameTo
ccding Apr 1, 2021
6795ec9
address comments based on offline discussion with Jun
ccding Apr 1, 2021
55ae3bc
Merge branch 'trunk' into fsync
ccding Apr 1, 2021
e653af4
check hadCleanShutdown for open FileRecord
ccding Apr 1, 2021
85861ee
address comments
ccding Apr 1, 2021
1578678
fix default values
ccding Apr 1, 2021
fffc353
more default value
ccding Apr 1, 2021
f66c545
do flush in the LogSegment class
ccding Apr 2, 2021
56be9d8
Merge branch 'trunk' into fsync
ccding Apr 2, 2021
1ecf94b
remove parameter from FileRecord.open
ccding Apr 2, 2021
080a79a
default to false
ccding Apr 2, 2021
61eee4a
add param to javadoc
ccding Apr 2, 2021
7543938
during flush -> during the next flush
ccding Apr 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ public void flush() throws IOException {
channel.force(true);
}

/**
* Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing.
*/
public void flushParentDir() throws IOException {
Utils.flushParentDir(file.toPath());
}

/**
* Close this record set
*/
Expand Down Expand Up @@ -245,7 +252,7 @@ public void updateParentDir(File parentDir) {
*/
public void renameTo(File f) throws IOException {
try {
Utils.atomicMoveWithFallback(file.toPath(), f.toPath());
Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false);
} finally {
this.file = f;
}
Expand Down
37 changes: 37 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.utils;

import java.nio.BufferUnderflowException;
import java.nio.file.StandardOpenOption;
import java.util.AbstractMap;
import java.util.EnumSet;
import java.util.SortedSet;
Expand Down Expand Up @@ -893,10 +894,23 @@ public static ClassLoader getContextOrKafkaClassLoader() {

/**
* Attempts to move source to target atomically and falls back to a non-atomic move if it fails.
* This function also flushes the parent directory to guarantee crash consistency.
*
* @throws IOException if both atomic and non-atomic moves fail
*/
public static void atomicMoveWithFallback(Path source, Path target) throws IOException {
atomicMoveWithFallback(source, target, true);
}

/**
* Attempts to move source to target atomically and falls back to a non-atomic move if it fails.
* This function allows callers to decide whether to flush the parent directory. This is needed
* when a sequence of atomicMoveWithFallback is called for the same directory and we don't want
* to repeatedly flush the same parent directory.
*
* @throws IOException if both atomic and non-atomic moves fail
*/
public static void atomicMoveWithFallback(Path source, Path target, boolean needFlushParentDir) throws IOException {
try {
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException outer) {
Expand All @@ -908,6 +922,29 @@ public static void atomicMoveWithFallback(Path source, Path target) throws IOExc
inner.addSuppressed(outer);
throw inner;
}
} finally {
if (needFlushParentDir) {
flushParentDir(target);
}
}
}

/**
* Flushes the parent directory to guarantee crash consistency.
*
* @throws IOException if flushing the parent directory fails.
*/
public static void flushParentDir(Path path) throws IOException {
FileChannel dir = null;
try {
Path parent = path.toAbsolutePath().getParent();
if (parent != null) {
dir = FileChannel.open(parent, StandardOpenOption.READ);
dir.force(true);
}
} finally {
if (dir != null)
dir.close();
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/AbstractIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset:
* @throws IOException if rename fails
*/
def renameTo(f: File): Unit = {
try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
finally _file = f
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LazyIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ object LazyIndex {
def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName)

def renameTo(f: File): Unit = {
try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
catch {
case _: NoSuchFileException if !file.exists => ()
}
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,8 @@ class Log(@volatile private var _dir: File,
baseOffset = baseOffset,
config,
time = time,
fileAlreadyExists = true)
fileAlreadyExists = true,
needsRecovery = !hadCleanShutdown)

try segment.sanityCheck(timeIndexFileNewlyCreated)
catch {
Expand Down Expand Up @@ -2325,7 +2326,7 @@ class Log(@volatile private var _dir: File,
* @throws IOException if the file can't be renamed and still exists
*/
private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = {
segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix, false))

def deleteSegments(): Unit = {
info(s"Deleting segment files ${segments.mkString(",")}")
Expand Down Expand Up @@ -2388,7 +2389,7 @@ class Log(@volatile private var _dir: File,
// need to do this in two phases to be crash safe AND do the delete asynchronously
// if we crash in the middle of this we complete the swap in loadSegments()
if (!isRecoveredSwapFile)
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix))
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix, false))
sortedNewSegments.reverse.foreach(addSegment(_))
val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import kafka.server.metadata.ConfigRepository
import kafka.server._
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException}

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -150,6 +150,7 @@ class LogManager(logDirs: Seq[File],
val created = dir.mkdirs()
if (!created)
throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}")
Utils.flushParentDir(dir.toPath)
}
if (!dir.isDirectory || !dir.canRead)
throw new IOException(s"${dir.getAbsolutePath} is not a readable log directory.")
Expand Down Expand Up @@ -866,6 +867,7 @@ class LogManager(logDirs: Seq[File],
val dir = new File(logDirPath, logDirName)
try {
Files.createDirectories(dir.toPath)
Utils.flushParentDir(dir.toPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should flush the parent dir when we delete a log too. This is not strictly required for every delete. So one option is to flush every parent dir when closing the LogManager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per our offline discussion, we decided not to flush at deletion. Deletions are async and can be retried after rebooting.

Success(dir)
} catch {
case e: IOException =>
Expand Down
22 changes: 18 additions & 4 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import java.nio.file.attribute.FileTime
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.LogSegmentOffsetOverflowException
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server.epoch.LeaderEpochFileCache
Expand Down Expand Up @@ -50,6 +51,7 @@ import scala.math._
* @param indexIntervalBytes The approximate number of bytes between entries in the index
* @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time
* @param time The time instance
* @param needsFlushParentDir Whether or not we need to flush the parent directory during the first flush
*/
@nonthreadsafe
class LogSegment private[log] (val log: FileRecords,
Expand All @@ -59,7 +61,8 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging {
val time: Time,
val needsFlushParentDir: Boolean = false) extends Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the new param to the javadoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


def offsetIndex: OffsetIndex = lazyOffsetIndex.get

Expand Down Expand Up @@ -95,6 +98,9 @@ class LogSegment private[log] (val log: FileRecords,
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0

/* whether or not we need to flush the parent dir during the next flush */
private val atomicNeedsFlushParentDir = new AtomicBoolean(needsFlushParentDir)

// The timestamp we used for time based log rolling and for ensuring max compaction delay
// volatile for LogCleaner to see the update
@volatile private var rollingBasedTimestamp: Option[Long] = None
Expand Down Expand Up @@ -472,6 +478,9 @@ class LogSegment private[log] (val log: FileRecords,
offsetIndex.flush()
timeIndex.flush()
txnIndex.flush()
// We only need to flush the parent of the log file because all other files share the same parent
if (atomicNeedsFlushParentDir.getAndSet(false))
log.flushParentDir()
}
}

Expand All @@ -490,11 +499,14 @@ class LogSegment private[log] (val log: FileRecords,
* Change the suffix for the index and log files for this log segment
* IOException from this method should be handled by the caller
*/
def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = {
def changeFileSuffixes(oldSuffix: String, newSuffix: String, needsFlushParentDir: Boolean = true): Unit = {
log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)))
lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)))
txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
// We only need to flush the parent of the log file because all other files share the same parent
if (needsFlushParentDir)
log.flushParentDir()
}

/**
Expand Down Expand Up @@ -657,7 +669,8 @@ class LogSegment private[log] (val log: FileRecords,
object LogSegment {

def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false,
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = {
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "",
needsRecovery: Boolean = false): LogSegment = {
val maxIndexSize = config.maxIndexSize
new LogSegment(
FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
Expand All @@ -667,7 +680,8 @@ object LogSegment {
baseOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
time)
time,
needsFlushParentDir = needsRecovery || !fileAlreadyExists)
}

def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/TransactionIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class TransactionIndex(val startOffset: Long, @volatile private var _file: File)
def renameTo(f: File): Unit = {
try {
if (file.exists)
Utils.atomicMoveWithFallback(file.toPath, f.toPath)
Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
} finally _file = f
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ final class KafkaMetadataLog private (
val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
val destination = Snapshots.deleteRename(path, snapshotId)
try {
Utils.atomicMoveWithFallback(path, destination)
Utils.atomicMoveWithFallback(path, destination, false)
} catch {
case e: IOException =>
error(s"Error renaming snapshot file: $path to $destination", e)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,7 @@ class LogCleanerTest {
// On recovery, clean operation is aborted. All messages should be present in the log
log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix)
for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)
}
log = recoverAndCheck(config, allKeys)

Expand All @@ -1386,7 +1386,7 @@ class LogCleanerTest {
// renamed to .deleted. Clean operation is resumed during recovery.
log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)
}
log = recoverAndCheck(config, cleanedKeys)

Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3431,7 +3431,7 @@ class LogTest {
segment.truncateTo(0)
})
for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)

val recoveredLog = recoverAndCheck(logConfig, expectedKeys)
assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog))
Expand Down Expand Up @@ -3459,7 +3459,7 @@ class LogTest {
segment.truncateTo(0)
}
for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)

val recoveredLog = recoverAndCheck(logConfig, expectedKeys)
assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog))
Expand All @@ -3483,7 +3483,7 @@ class LogTest {
segment.changeFileSuffixes("", Log.SwapFileSuffix)
})
for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)

// Truncate the old segment
segmentWithOverflow.truncateTo(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void testDeleteSnapshot(boolean renameBeforeDeleting) throws IOException

if (renameBeforeDeleting)
// rename snapshot before deleting
Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId));
Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId), false);

assertTrue(Snapshots.deleteSnapshotIfExists(logDirPath, snapshot.snapshotId()));
assertFalse(Files.exists(snapshotPath));
Expand Down