From e3f5f293104013da3aa765c9ba7cbdb2553f6485 Mon Sep 17 00:00:00 2001 From: Ishita Mandhan Date: Thu, 25 Aug 2016 14:38:16 -0700 Subject: [PATCH] KAFKA-3940 Log should check the return value of dir.mkdirs() --- core/src/main/scala/kafka/log/Log.scala | 10 +++++++++- core/src/main/scala/kafka/log/LogManager.scala | 15 +++++++++++---- .../kafka/metrics/KafkaCSVMetricsReporter.scala | 11 +++++++++-- .../kafka/log/LogCleanerIntegrationTest.scala | 11 ++++++++--- .../scala/unit/kafka/log/LogManagerTest.scala | 8 +++++++- core/src/test/scala/unit/kafka/log/LogTest.scala | 16 +++++++++++++--- .../test/scala/unit/kafka/utils/TestUtils.scala | 9 +++++++-- 7 files changed, 64 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index d343d6f4dd63..c7bcb3500990 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -24,6 +24,7 @@ import kafka.common._ import kafka.metrics.KafkaMetricsGroup import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata} import java.io.{File, IOException} +import java.nio.file.Files import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap} import java.util.concurrent.atomic._ import java.text.NumberFormat @@ -146,7 +147,14 @@ class Log(val dir: File, /* Load the log segments from the log files on disk */ private def loadSegments() { // create the log directory if it doesn't exist - dir.mkdirs() + if (!Files.exists(dir.toPath())) { + try { + Files.createDirectories(dir.toPath) + } catch { + case e: IOException => throw new KafkaException("Error in creating log directory ${dir.toPath}", e) + } + } + var swapFiles = Set[File]() // first do a pass through the files in the log directory and remove any temporary files diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 7806edad5ad6..5271452311f4 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -18,6 +18,7 @@ package kafka.log import java.io._ +import java.nio.file.Files import java.util.concurrent.TimeUnit import kafka.utils._ @@ -81,9 +82,11 @@ class LogManager(val logDirs: Array[File], for(dir <- dirs) { if(!dir.exists) { info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.") - val created = dir.mkdirs() - if(!created) - throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath) + try { + Files.createDirectories(dir.toPath) + } catch { + case e: IOException => throw new KafkaException("Error in creating log directory ${dir.toPath}", e) + } } if(!dir.isDirectory || !dir.canRead) throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.") @@ -358,7 +361,11 @@ class LogManager(val logDirs: Array[File], // if not, create it val dataDir = nextLogDir() val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition) - dir.mkdirs() + try { + Files.createDirectories(dir.toPath) + } catch { + case e: IOException => throw new KafkaException("Error in creating log directory ${dir.toPath}", e) + } log = new Log(dir, config, recoveryPoint = 0L, diff --git a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala index 686f692548d9..efb19a5184b7 100755 --- a/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala +++ b/core/src/main/scala/kafka/metrics/KafkaCSVMetricsReporter.scala @@ -21,11 +21,13 @@ package kafka.metrics import com.yammer.metrics.Metrics -import java.io.File +import java.io.{File, IOException} +import java.nio.file.Files import com.yammer.metrics.reporting.CsvReporter import java.util.concurrent.TimeUnit +import kafka.common._ import kafka.utils.{Logging, VerifiableProperties} import org.apache.kafka.common.utils.Utils @@ -50,7 +52,12 @@ private class KafkaCSVMetricsReporter extends KafkaMetricsReporter val metricsConfig = new KafkaMetricsConfig(props) csvDir = new File(props.getString("kafka.csv.metrics.dir", "kafka_metrics")) Utils.delete(csvDir) - csvDir.mkdirs() + try { + Files.createDirectories(csvDir.toPath) + } catch { + case e: IOException => throw new KafkaException("Error in creating metrics directory ${csvDir.toPath}", e) + } + underlying = new CsvReporter(Metrics.defaultRegistry(), csvDir) if (props.getBoolean("kafka.csv.metrics.reporter.enabled", default = false)) { initialized = true diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index 0449be5f5094..ff2f0eb94939 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -17,11 +17,12 @@ package kafka.log -import java.io.File +import java.io.{File, IOException} +import java.nio.file.Files import java.util.Properties import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0} -import kafka.common.TopicAndPartition +import kafka.common._ import kafka.message._ import kafka.server.OffsetCheckpoint import kafka.utils._ @@ -262,7 +263,11 @@ class LogCleanerIntegrationTest(compressionCodec: String) { val logs = new Pool[TopicAndPartition, Log]() for(i <- 0 until parts) { val dir = new File(logDir, "log-" + i) - dir.mkdirs() + try { + Files.createDirectories(dir.toPath()) + } catch { + case e: IOException => throw new KafkaException("Error in creating new directory ${dir.toPath}", e) + } val log = new Log(dir = dir, LogConfig(logConfigProperties(propertyOverrides, maxMessageSize, minCleanableDirtyRatio)), diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 95a1cb5a9b9b..1c0cab18af16 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -18,6 +18,7 @@ package kafka.log import java.io._ +import java.nio.file.Files import java.util.Properties import kafka.common._ @@ -267,7 +268,12 @@ class LogManagerTest { def testRecoveryDirectoryMappingWithRelativeDirectory() { logManager.shutdown() logDir = new File("data" + File.separator + logDir.getName) - logDir.mkdirs() + try { + Files.createDirectories(logDir.toPath) + } catch { + case e: IOException => throw new KafkaException("Error in creating log directory ${logDir.toPath}", e) + } + logDir.deleteOnExit() logManager = createLogManager() logManager.startup diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 7f6ef6e1517c..c6b149e845a6 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -18,11 +18,12 @@ package kafka.log import java.io._ +import java.nio.file.Files import java.util.Properties import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException} import kafka.api.ApiVersion -import kafka.common.LongRef +import kafka.common._ import org.junit.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.{After, Before, Test} @@ -336,7 +337,12 @@ class LogTest extends JUnitSuite { @Test def testThatGarbageCollectingSegmentsDoesntChangeOffset() { for(messagesToAppend <- List(0, 1, 25)) { - logDir.mkdirs() + try { + Files.createDirectories(logDir.toPath) + } catch { + case e: IOException => throw new KafkaException("Error in creating log directory ${logDir.toPath}", e) + } + // first test a log segment starting at 0 val logProps = new Properties() logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer) @@ -923,7 +929,11 @@ class LogTest extends JUnitSuite { val recoveryPoint = 50L for(iteration <- 0 until 50) { // create a log and write some messages to it - logDir.mkdirs() + try { + Files.createDirectories(logDir.toPath) + } catch { + case e: IOException => throw new KafkaException("Error in creating log directory ${logDir.toPath}", e) + } var log = new Log(logDir, config, recoveryPoint = 0L, diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 131a24a7f75f..ca16cad54b2a 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -20,6 +20,7 @@ package kafka.utils import java.io._ import java.nio._ import java.nio.channels._ +import java.nio.file.Files import java.util.concurrent.{Callable, Executors, TimeUnit} import java.util.{Properties, Random} import java.security.cert.X509Certificate @@ -37,7 +38,7 @@ import kafka.api._ import kafka.cluster.{Broker, EndPoint} import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream} import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder} -import kafka.common.TopicAndPartition +import kafka.common._ import kafka.admin.AdminUtils import kafka.log._ import kafka.utils.ZkUtils._ @@ -80,7 +81,11 @@ object TestUtils extends Logging { */ def tempRelativeDir(parent: String): File = { val parentFile = new File(parent) - parentFile.mkdirs() + try { + Files.createDirectories(parentFile.toPath()) + } catch { + case e: IOException => throw new KafkaException("Error in creating directory ${parentFile.toPath}", e) + } JTestUtils.tempDirectory(parentFile.toPath, null) }