diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index 9cfa9ab8ca..d3fa6ec26d 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -629,8 +629,8 @@ private void restoreStores() { // initialize each TaskStorageManager this.taskRestoreManagers.values().forEach(taskStorageManager -> taskStorageManager.initialize()); - // Start store consumers - this.storeConsumers.values().forEach(systemConsumer -> systemConsumer.start()); + // Start each store consumer once + this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> systemConsumer.start()); // Create a thread pool for parallel restores (and stopping of persistent stores) ExecutorService executorService = Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize, @@ -657,8 +657,8 @@ private void restoreStores() { executorService.shutdown(); - // Stop store consumers - this.storeConsumers.values().forEach(systemConsumer -> systemConsumer.stop()); + // Stop each store consumer once + this.storeConsumers.values().stream().distinct().forEach(systemConsumer -> systemConsumer.stop()); // Now re-create persistent stores in read-write mode, leave non-persistent stores as-is recreatePersistentTaskStoresInReadWriteMode(this.containerModel, jobContext, containerContext,