diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java index 228617a0f3..737ac3e0c8 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java @@ -29,6 +29,7 @@ import org.apache.samza.job.model.JobModel; import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.runtime.ProcessorIdGenerator; +import org.apache.samza.storage.ChangelogStreamManager; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemAdmins; import org.apache.samza.util.*; @@ -81,6 +82,8 @@ public void start() { if (checkpointManager != null) { checkpointManager.createResources(); } + + ChangelogStreamManager.createChangelogStreams(config, jobModel.maxChangeLogStreamPartitions); } catch (Exception e) { LOGGER.error("Exception while trying to getJobModel.", e); if (coordinatorListener != null) {