From d9553ffee6227488fe9e5e29901b7fe1bf789183 Mon Sep 17 00:00:00 2001 From: Ray Matharu Date: Thu, 27 Jun 2019 12:08:26 -0700 Subject: [PATCH] Ensuring store consumers are started exactly once --- .../org/apache/samza/storage/ContainerStorageManager.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index 9cfa9ab8ca..d3fa6ec26d 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -629,8 +629,8 @@ private void restoreStores() { // initialize each TaskStorageManager this.taskRestoreManagers.values().forEach(taskStorageManager -> taskStorageManager.initialize()); - // Start store consumers - this.storeConsumers.values().forEach(systemConsumer -> systemConsumer.start()); + // Start each store consumer once + this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> systemConsumer.start()); // Create a thread pool for parallel restores (and stopping of persistent stores) ExecutorService executorService = Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize, @@ -657,8 +657,8 @@ private void restoreStores() { executorService.shutdown(); - // Stop store consumers - this.storeConsumers.values().forEach(systemConsumer -> systemConsumer.stop()); + // Stop each store consumer once + this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> systemConsumer.stop()); // Now re-create persistent stores in read-write mode, leave non-persistent stores as-is recreatePersistentTaskStoresInReadWriteMode(this.containerModel, jobContext, containerContext,