From 4c026ffc94c604a55be481a99ba8f533ed7002fc Mon Sep 17 00:00:00 2001 From: Prateek Maheshwari Date: Wed, 19 Apr 2017 17:26:50 -0700 Subject: [PATCH] SAMZA-1229: Disk space monitor should only count data in use by the container --- .../samza/container/SamzaContainer.scala | 16 ++++---- .../samza/storage/TaskStorageManager.scala | 39 ++++++++++--------- 2 files changed, 29 insertions(+), 26 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index aba0d17ef8..a30b7938ea 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -413,7 +413,6 @@ object SamzaContainer extends Logging { info("Got default storage engine base directory: %s" format defaultStoreBaseDir) val storeWatchPaths = new util.HashSet[Path]() - storeWatchPaths.add(defaultStoreBaseDir.toPath) val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.asScala.map(taskModel => { debug("Setting up task instance: %s" format taskModel) @@ -455,8 +454,6 @@ object SamzaContainer extends Logging { loggedStorageBaseDir = defaultStoreBaseDir } - storeWatchPaths.add(loggedStorageBaseDir.toPath) - info("Got base directory for logged data stores: %s" format loggedStorageBaseDir) val taskStores = storageEngineFactories @@ -467,25 +464,30 @@ object SamzaContainer extends Logging { } else { null } + val keySerde = config.getStorageKeySerde(storeName) match { case Some(keySerde) => serdes.getOrElse(keySerde, throw new SamzaException("StorageKeySerde: No class defined for serde: %s." format keySerde)) case _ => null } + val msgSerde = config.getStorageMsgSerde(storeName) match { case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new SamzaException("StorageMsgSerde: No class defined for serde: %s." format msgSerde)) case _ => null } - val storeBaseDir = if(changeLogSystemStreamPartition != null) { + + val storeDir = if (changeLogSystemStreamPartition != null) { TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName) - } - else { + } else { TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, storeName, taskName) } + + storeWatchPaths.add(storeDir.toPath) + val storageEngine = storageEngineFactory.getStorageEngine( storeName, - storeBaseDir, + storeDir, keySerde, msgSerde, collector, diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala index 491f77b740..977ac5bf53 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala @@ -93,24 +93,24 @@ class TaskStorageManager( debug("Cleaning base directories for stores.") taskStores.keys.foreach(storeName => { - val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) - info("Got default storage partition directory as %s" format storagePartitionDir.toPath.toString) + val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) + info("Got default storage partition directory as %s" format storePartitionDir.toPath.toString) - if(storagePartitionDir.exists()) { - info("Deleting default storage partition directory %s" format storagePartitionDir.toPath.toString) - Util.rm(storagePartitionDir) + if(storePartitionDir.exists()) { + info("Deleting default storage partition directory %s" format storePartitionDir.toPath.toString) + Util.rm(storePartitionDir) } - val loggedStoreDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) - info("Got logged storage partition directory as %s" format loggedStoreDir.toPath.toString) + val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) + info("Got logged storage partition directory as %s" format loggedStorePartitionDir.toPath.toString) // Delete the logged store if it is not valid. - if (!isLoggedStoreValid(storeName, loggedStoreDir)) { - info("Deleting logged storage partition directory %s." format loggedStoreDir.toPath.toString) - Util.rm(loggedStoreDir) + if (!isLoggedStoreValid(storeName, loggedStorePartitionDir)) { + info("Deleting logged storage partition directory %s." format loggedStorePartitionDir.toPath.toString) + Util.rm(loggedStorePartitionDir) } else { - val offset = readOffsetFile(loggedStoreDir) - info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStoreDir)) + val offset = readOffsetFile(loggedStorePartitionDir) + info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStorePartitionDir)) fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset) } }) @@ -182,13 +182,13 @@ class TaskStorageManager( taskStores.foreach { case (storeName, storageEngine) => if (storageEngine.getStoreProperties.isLoggedStore) { - val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) - info("Using logged storage partition directory: %s for store: %s." format(loggedStoragePartitionDir.toPath.toString, storeName)) - if (!loggedStoragePartitionDir.exists()) loggedStoragePartitionDir.mkdirs() + val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) + info("Using logged storage partition directory: %s for store: %s." format(loggedStorePartitionDir.toPath.toString, storeName)) + if (!loggedStorePartitionDir.exists()) loggedStorePartitionDir.mkdirs() } else { - val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) - info("Using storage partition directory: %s for store: %s." format(storagePartitionDir.toPath.toString, storeName)) - storagePartitionDir.mkdirs() + val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) + info("Using storage partition directory: %s for store: %s." format(storePartitionDir.toPath.toString, storeName)) + storePartitionDir.mkdirs() } } } @@ -322,7 +322,8 @@ class TaskStorageManager( } debug("Got offset %s for store %s" format(newestOffset, storeName)) - val offsetFile = new File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName), offsetFileName) + val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName) + val offsetFile = new File(loggedStorePartitionDir, offsetFileName) if (newestOffset != null) { debug("Storing offset for store in OFFSET file ") Util.writeDataToFile(offsetFile, newestOffset)