From af00fd145f05cb7bee2474ee37f366e1fd56912d Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 17 Dec 2014 13:35:20 -0800 Subject: [PATCH 1/4] [SPARK-4790][STREAMING] Fix ReceivedBlockTrackerSuite waits for old files to get deleted before continuing. Since the deletes are happening asynchronously, the getFileStatus call might throw an exception in older HDFS versions, if the delete happens between the time listFiles is called on the directory and getFileStatus is called on the file in the getFileStatus method. This PR addresses this by adding an option to delete the files synchronously and then waiting for the deletion to complete before proceeding. --- .../scheduler/ReceivedBlockTracker.scala | 12 ++++++-- .../streaming/util/WriteAheadLogManager.scala | 28 +++++++++++++++++-- .../streaming/ReceivedBlockTrackerSuite.scala | 2 +- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 02758e0bca6c5..958eb767d16a9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -139,14 +139,22 @@ private[streaming] class ReceivedBlockTracker( getReceivedBlockQueue(streamId).toSeq } - /** Clean up block information of old batches. */ + /** Clean up block information of old batches asynchronously. */ def cleanupOldBatches(cleanupThreshTime: Time): Unit = synchronized { + cleanupOldBatches(cleanupThreshTime, waitForCompletion = false) + } + + /** + * Clean up block information of old batches. If waitForCompletion is true, this method + * returns only after the files are cleaned up. + */ + def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized { assert(cleanupThreshTime.milliseconds < clock.currentTime()) val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq logInfo("Deleting batches " + timesToCleanup) writeToLog(BatchCleanupEvent(timesToCleanup)) timeToAllocatedBlocks --= timesToCleanup - logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds)) + logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, waitForCompletion)) log } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index 70d234320be7c..6c8c74a88f08e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -19,11 +19,11 @@ package org.apache.spark.streaming.util import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.Logging import org.apache.spark.util.Utils import WriteAheadLogManager._ @@ -124,8 +124,25 @@ private[streaming] class WriteAheadLogManager( * files, which is usually based on the local system time. So if there is coordination necessary * between the node calculating the threshTime (say, driver node), and the local system time * (say, worker node), the caller has to take account of possible time skew. + * + * This method deletes old log files asynchronously. */ def cleanupOldLogs(threshTime: Long): Unit = { + cleanupOldLogs(threshTime, waitForCompletion = false) + } + + /** + * Delete the log files that are older than the threshold time. + * + * Its important to note that the threshold time is based on the time stamps used in the log + * files, which is usually based on the local system time. So if there is coordination necessary + * between the node calculating the threshTime (say, driver node), and the local system time + * (say, worker node), the caller has to take account of possible time skew. + * + * If waitForCompletion is set to true, this method will return only after old logs have been + * deleted. Else the files will be deleted asynchronously. + */ + def cleanupOldLogs(threshTime: Long, waitForCompletion: Boolean): Unit = { val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } } logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in $logDirectory " + s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}") @@ -146,10 +163,15 @@ private[streaming] class WriteAheadLogManager( logInfo(s"Cleared log files in $logDirectory older than $threshTime") } if (!executionContext.isShutdown) { - Future { deleteFiles() } + val f = Future { deleteFiles() } + if (waitForCompletion) { + import scala.concurrent.duration._ + Await.ready(f, 1 second) + } } } + /** Stop the manager, close any open log writer */ def stop(): Unit = synchronized { if (currentLogWriter != null) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 01a09b67b99dc..de7e9d624bf6b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -166,7 +166,7 @@ class ReceivedBlockTrackerSuite // Cleanup first batch but not second batch val oldestLogFile = getWriteAheadLogFiles().head incrementTime() - tracker3.cleanupOldBatches(batchTime2) + tracker3.cleanupOldBatches(batchTime2, waitForCompletion = true) // Verify that the batch allocations have been cleaned, and the act has been written to log tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual Seq.empty From e4c83eca17c660644216b6d8cc862f8b1654649c Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 24 Dec 2014 14:38:20 -0800 Subject: [PATCH 2/4] Making waitForCompletion a mandatory param. Remove eventually from WALSuite since the cleanup method returns only after all files are deleted. --- .../streaming/receiver/ReceivedBlockHandler.scala | 2 +- .../streaming/util/WriteAheadLogManager.scala | 14 -------------- .../spark/streaming/util/WriteAheadLogSuite.scala | 6 ++---- 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index fdf995320beb4..2ac3b592dd90f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -178,7 +178,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( } def cleanupOldBlock(threshTime: Long) { - logManager.cleanupOldLogs(threshTime) + logManager.cleanupOldLogs(threshTime, waitForCompletion = false) } def stop() { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index 6c8c74a88f08e..9e16b6080c495 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -117,20 +117,6 @@ private[streaming] class WriteAheadLogManager( } flatMap { x => x } } - /** - * Delete the log files that are older than the threshold time. - * - * Its important to note that the threshold time is based on the time stamps used in the log - * files, which is usually based on the local system time. So if there is coordination necessary - * between the node calculating the threshTime (say, driver node), and the local system time - * (say, worker node), the caller has to take account of possible time skew. - * - * This method deletes old log files asynchronously. - */ - def cleanupOldLogs(threshTime: Long): Unit = { - cleanupOldLogs(threshTime, waitForCompletion = false) - } - /** * Delete the log files that are older than the threshold time. * diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 8f69bcb64279d..fe66d7c9713f7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -188,10 +188,8 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false) val logFiles = getLogFilesInDirectory(testDir) assert(logFiles.size > 1) - manager.cleanupOldLogs(manualClock.currentTime() / 2) - eventually(timeout(1 second), interval(10 milliseconds)) { - assert(getLogFilesInDirectory(testDir).size < logFiles.size) - } + manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion = true) + assert(getLogFilesInDirectory(testDir).size < logFiles.size) } test("WriteAheadLogManager - handling file errors while reading rotating logs") { From 3255f1710d7ffbbbd2de4b098eb444885b2aad36 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Wed, 24 Dec 2014 20:55:22 -0800 Subject: [PATCH 3/4] Add test for async deletion. Remove method from ReceiverTracker that does not take waitForCompletion. --- .../scheduler/ReceivedBlockTracker.scala | 5 ---- .../streaming/scheduler/ReceiverTracker.scala | 2 +- .../streaming/util/WriteAheadLogSuite.scala | 24 +++++++++++++++++-- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 958eb767d16a9..2ce458cddec1a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -139,11 +139,6 @@ private[streaming] class ReceivedBlockTracker( getReceivedBlockQueue(streamId).toSeq } - /** Clean up block information of old batches asynchronously. */ - def cleanupOldBatches(cleanupThreshTime: Time): Unit = synchronized { - cleanupOldBatches(cleanupThreshTime, waitForCompletion = false) - } - /** * Clean up block information of old batches. If waitForCompletion is true, this method * returns only after the files are cleaned up. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 32e481dabc8ca..019549796aa13 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -121,7 +121,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** Clean up metadata older than the given threshold time */ def cleanupOldMetadata(cleanupThreshTime: Time) { - receivedBlockTracker.cleanupOldBatches(cleanupThreshTime) + receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false) } /** Register a receiver */ diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index fe66d7c9713f7..6467c3ac6c275 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -182,14 +182,34 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { } test("WriteAheadLogManager - cleanup old logs") { + logCleanUpTest(waitForCompletion = false) + } + + test("WriteAheadLogManager - cleanup old logs synchronously") { + logCleanUpTest(waitForCompletion = true) + } + + private def logCleanUpTest(waitForCompletion: Boolean): Unit = { // Write data with manager, recover with new manager and verify val manualClock = new ManualClock val dataToWrite = generateRandomData() manager = writeDataUsingManager(testDir, dataToWrite, manualClock, stopManager = false) val logFiles = getLogFilesInDirectory(testDir) assert(logFiles.size > 1) - manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion = true) - assert(getLogFilesInDirectory(testDir).size < logFiles.size) + + // To avoid code repeat + def cleanUpAndVerify(): Unit = { + manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion) + assert(getLogFilesInDirectory(testDir).size < logFiles.size) + } + + if (waitForCompletion) { + cleanUpAndVerify() + } else { + eventually(timeout(1 second), interval(10 milliseconds)) { + cleanUpAndVerify() + } + } } test("WriteAheadLogManager - handling file errors while reading rotating logs") { From bbbacd1a441e43ce46e49bea6c85c6d7834c5487 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Mon, 29 Dec 2014 15:09:13 -0800 Subject: [PATCH 4/4] Call cleanUpOldLogs only once in the tests. --- .../streaming/receiver/ReceivedBlockHandler.scala | 6 +++--- .../spark/streaming/util/WriteAheadLogManager.scala | 3 ++- .../spark/streaming/ReceivedBlockHandlerSuite.scala | 2 +- .../spark/streaming/util/WriteAheadLogSuite.scala | 10 +++------- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index 2ac3b592dd90f..d46ebbadb9749 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -42,7 +42,7 @@ private[streaming] trait ReceivedBlockHandler { def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult /** Cleanup old blocks older than the given threshold time */ - def cleanupOldBlock(threshTime: Long) + def cleanupOldBlocks(threshTime: Long) } @@ -82,7 +82,7 @@ private[streaming] class BlockManagerBasedBlockHandler( BlockManagerBasedStoreResult(blockId) } - def cleanupOldBlock(threshTime: Long) { + def cleanupOldBlocks(threshTime: Long) { // this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing // of BlockRDDs. } @@ -177,7 +177,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( WriteAheadLogBasedStoreResult(blockId, segment) } - def cleanupOldBlock(threshTime: Long) { + def cleanupOldBlocks(threshTime: Long) { logManager.cleanupOldLogs(threshTime, waitForCompletion = false) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index 9e16b6080c495..166661b7496df 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -126,7 +126,8 @@ private[streaming] class WriteAheadLogManager( * (say, worker node), the caller has to take account of possible time skew. * * If waitForCompletion is set to true, this method will return only after old logs have been - * deleted. Else the files will be deleted asynchronously. + * deleted. This should be set to true only for testing. Else the files will be deleted + * asynchronously. */ def cleanupOldLogs(threshTime: Long, waitForCompletion: Boolean): Unit = { val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 3661e16a9ef2f..132ff2443fc0f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -168,7 +168,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche manualClock.currentTime() shouldEqual 5000L val cleanupThreshTime = 3000L - handler.cleanupOldBlock(cleanupThreshTime) + handler.cleanupOldBlocks(cleanupThreshTime) eventually(timeout(10000 millis), interval(10 millis)) { getWriteAheadLogFiles().size should be < preCleanupLogFiles.size } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 6467c3ac6c275..7ce9499dc614d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -197,17 +197,13 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { val logFiles = getLogFilesInDirectory(testDir) assert(logFiles.size > 1) - // To avoid code repeat - def cleanUpAndVerify(): Unit = { - manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion) - assert(getLogFilesInDirectory(testDir).size < logFiles.size) - } + manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion) if (waitForCompletion) { - cleanUpAndVerify() + assert(getLogFilesInDirectory(testDir).size < logFiles.size) } else { eventually(timeout(1 second), interval(10 milliseconds)) { - cleanUpAndVerify() + assert(getLogFilesInDirectory(testDir).size < logFiles.size) } } }