From e4c94793bd0bd82baf9c3be61046b719182878bf Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 14 Jan 2016 08:45:49 +0000 Subject: [PATCH 1/3] Use `Utils.atomicMoveWithFallback` instead of `File.rename` It behaves better on Windows and provides more useful error messages. --- .../main/scala/kafka/log/FileMessageSet.scala | 10 +++++----- .../src/main/scala/kafka/log/LogSegment.scala | 20 ++++++++++++------- .../main/scala/kafka/log/OffsetIndex.scala | 11 +++++----- .../server/BrokerMetadataCheckpoint.scala | 10 ++-------- .../scala/unit/kafka/log/CleanerTest.scala | 7 ++++--- .../kafka/streams/state/OffsetCheckpoint.java | 10 +--------- 6 files changed, 31 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala index b239a6ce3c256..d4ce4986dff3f 100755 --- a/core/src/main/scala/kafka/log/FileMessageSet.scala +++ b/core/src/main/scala/kafka/log/FileMessageSet.scala @@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit import kafka.metrics.{KafkaTimer, KafkaMetricsGroup} import org.apache.kafka.common.errors.CorruptRecordException import org.apache.kafka.common.network.TransportLayer +import org.apache.kafka.common.utils.Utils /** * An on-disk message set. An optional start and end position can be applied to the message set @@ -291,12 +292,11 @@ class FileMessageSet private[kafka](@volatile var file: File, /** * Rename the file that backs this message set - * @return true iff the rename was successful + * @throws IOException if rename fails. */ - def renameTo(f: File): Boolean = { - val success = this.file.renameTo(f) - this.file = f - success + def renameTo(f: File) { + try Utils.atomicMoveWithFallback(file.toPath, f.toPath) + finally this.file = f } } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index d604d6c131cad..aa37d52a0ef7a 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -23,7 +23,7 @@ import kafka.server.{LogOffsetMetadata, FetchDataInfo} import org.apache.kafka.common.errors.CorruptRecordException import scala.math._ -import java.io.File +import java.io.{IOException, File} /** @@ -256,12 +256,18 @@ class LogSegment(val log: FileMessageSet, * Change the suffix for the index and log file for this log segment */ def changeFileSuffixes(oldSuffix: String, newSuffix: String) { - val logRenamed = log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) - if(!logRenamed) - throw new KafkaStorageException("Failed to change the log file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) - val indexRenamed = index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) - if(!indexRenamed) - throw new KafkaStorageException("Failed to change the index file suffix from %s to %s for log segment %d".format(oldSuffix, newSuffix, baseOffset)) + + def kafkaStorageException(fileType: String, e: IOException) = + new KafkaStorageException(s"Failed to change the $fileType file suffix from $oldSuffix to $newSuffix for log segment $baseOffset", e) + + try log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) + catch { + case e: IOException => throw kafkaStorageException("log", e) + } + try index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, oldSuffix, newSuffix))) + catch { + case e: IOException => throw kafkaStorageException("index", e) + } } /** diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index 84d18bd4a1ffc..e95c9d139ecd5 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -17,6 +17,8 @@ package kafka.log +import org.apache.kafka.common.utils.Utils + import scala.math._ import java.io._ import java.nio._ @@ -338,12 +340,11 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi /** * Rename the file that backs this offset index - * @return true iff the rename was successful + * @throws IOException if rename fails */ - def renameTo(f: File): Boolean = { - val success = this.file.renameTo(f) - this.file = f - success + def renameTo(f: File) { + try Utils.atomicMoveWithFallback(file.toPath, f.toPath) + finally this.file = f } /** diff --git a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala index 6e8d68dfae20d..00e5d0caf16f7 100755 --- a/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala +++ b/core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala @@ -43,16 +43,10 @@ class BrokerMetadataCheckpoint(val file: File) extends Logging { fileOutputStream.flush() fileOutputStream.getFD().sync() fileOutputStream.close() - // swap new BrokerMetadata file with previous one - if(!temp.renameTo(file)) { - // renameTo() fails on windows if destination file exists. - file.delete() - if(!temp.renameTo(file)) - throw new IOException("File rename from %s to %s failed.".format(temp.getAbsolutePath(), file.getAbsolutePath())) - } + Utils.atomicMoveWithFallback(temp.toPath, file.toPath) } catch { case ie: IOException => - error("Failed to write meta.properties due to ",ie) + error("Failed to write meta.properties due to", ie) throw ie } } diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala index 8ab9f91e82dba..a8092de40b928 100755 --- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala @@ -19,6 +19,7 @@ package kafka.log import java.io.File import java.nio._ +import java.nio.file.Paths import java.util.Properties import java.util.concurrent.atomic.AtomicLong @@ -376,7 +377,7 @@ class CleanerTest extends JUnitSuite { // On recovery, clean operation is aborted. All messages should be present in the log log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix) for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { - file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) } log = recoverAndCheck(config, allKeys) @@ -388,7 +389,7 @@ class CleanerTest extends JUnitSuite { // renamed to .deleted. Clean operation is resumed during recovery. log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { - file.renameTo(new File(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) } log = recoverAndCheck(config, cleanedKeys) @@ -478,4 +479,4 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap { def size: Int = map.size -} \ No newline at end of file +} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java index e04de68afde8c..6a201ea35d133 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java @@ -82,15 +82,7 @@ public void write(Map offsets) throws IOException { writer.close(); } - // swap new offset checkpoint file with previous one - if (!temp.renameTo(file)) { - // renameTo() fails on Windows if the destination file exists. - file.delete(); - if (!temp.renameTo(file)) - throw new IOException(String.format("File rename from %s to %s failed.", - temp.getAbsolutePath(), - file.getAbsolutePath())); - } + Utils.atomicMoveWithFallback(temp.toPath(), file.toPath()); } } From e9894b9691f8d865b8c1a3afe989ae17ccbf15fe Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 14 Jan 2016 08:47:08 +0000 Subject: [PATCH 2/3] Minor inconsistency fix in `OffsetCheckpoint.malformedLineException` --- core/src/main/scala/kafka/server/OffsetCheckpoint.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala index fe1d8235ceb80..77f283c72b013 100644 --- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala +++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala @@ -71,7 +71,7 @@ class OffsetCheckpoint(val file: File) extends Logging { def read(): Map[TopicAndPartition, Long] = { def malformedLineException(line: String) = - throw new IOException(s"Malformed line in offset checkpoint file: $line'") + new IOException(s"Malformed line in offset checkpoint file: $line'") lock synchronized { val reader = new BufferedReader(new FileReader(file)) @@ -104,7 +104,7 @@ class OffsetCheckpoint(val file: File) extends Logging { throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version) } } catch { - case e: NumberFormatException => malformedLineException(line) + case e: NumberFormatException => throw malformedLineException(line) } finally { reader.close() } From 29372fa2d3fbe4cfb5b4b88184539e5c9ac405b2 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Thu, 14 Jan 2016 08:48:44 +0000 Subject: [PATCH 3/3] Remove delete from `streams.state.OffsetCheckpoint` constructor This is similar to the change in kafka.server.OffsetCheckpoint. --- .../org/apache/kafka/streams/state/OffsetCheckpoint.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java index 6a201ea35d133..d748aac10fd17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.state; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; import java.io.BufferedReader; import java.io.BufferedWriter; @@ -55,7 +56,6 @@ public class OffsetCheckpoint { private final Object lock; public OffsetCheckpoint(File file) throws IOException { - new File(file + ".tmp").delete(); // try to delete any existing temp files for cleanliness this.file = file; this.lock = new Object(); } @@ -71,11 +71,9 @@ public void write(Map offsets) throws IOException { writeIntLine(writer, VERSION); writeIntLine(writer, offsets.size()); - // write the entries for (Map.Entry entry : offsets.entrySet()) writeEntry(writer, entry.getKey(), entry.getValue()); - // flush the buffer and then fsync the underlying file writer.flush(); fileOutputStream.getFD().sync(); } finally { @@ -114,7 +112,7 @@ public Map read() throws IOException { switch (version) { case 0: int expectedSize = readInt(reader); - Map offsets = new HashMap(); + Map offsets = new HashMap<>(); String line = reader.readLine(); while (line != null) { String[] pieces = line.split("\\s+");