Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,15 @@ public void run() {
MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, this.metrics, config);
metadataResourceUtil.createResources();

// fan out the startpoints
StartpointManager startpointManager = createStartpointManager();
startpointManager.start();
try {
startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
} finally {
startpointManager.stop();
// fan out the startpoints if startpoints is enabled
if (new JobConfig(config).getStartpointEnabled()) {
StartpointManager startpointManager = createStartpointManager();
startpointManager.start();
try {
startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
} finally {
startpointManager.stop();
}
}

// Remap changelog partitions to tasks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ public class JobConfig extends MapConfig {
public static final String CLUSTER_BASED_JOB_COORDINATOR_DEPENDENCY_ISOLATION_ENABLED =
"samza.cluster.based.job.coordinator.dependency.isolation.enabled";

private static final String JOB_STARTPOINT_ENABLED = "job.startpoint.enabled";

public JobConfig(Config config) {
super(config);
}
Expand Down Expand Up @@ -404,4 +406,8 @@ public String getCoordinatorStreamFactory() {
public Optional<String> getConfigLoaderFactory() {
return Optional.ofNullable(get(CONFIG_LOADER_FACTORY));
}

public boolean getStartpointEnabled() {
return getBoolean(JOB_STARTPOINT_ENABLED, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,10 @@ SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
// Metadata store lifecycle managed outside of the SamzaContainer.
// All manager lifecycles are managed in the SamzaContainer including startpointManager
StartpointManager startpointManager = null;
if (metadataStore != null) {
if (metadataStore != null && new JobConfig(config).getStartpointEnabled()) {
startpointManager = new StartpointManager(metadataStore);
} else if (!new JobConfig(config).getStartpointEnabled()) {
LOGGER.warn("StartpointManager not instantiated because startpoints is not enabled");
} else {
LOGGER.warn("StartpointManager cannot be instantiated because no metadata store defined for this stream processor");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ private static void run(
LocalityManager localityManager = new LocalityManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetContainerHostMapping.TYPE));

// StartpointManager wraps the coordinatorStreamStore in the namespaces internally
StartpointManager startpointManager = new StartpointManager(coordinatorStreamStore);
StartpointManager startpointManager = null;
if (new JobConfig(config).getStartpointEnabled()) {
startpointManager = new StartpointManager(coordinatorStreamStore);
}

Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,24 @@ public void writeStartpoint(SystemStreamPartition ssp, TaskName taskName, Startp
* @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}.
* It is empty if it does not exist or if it is too stale.
*/
@VisibleForTesting
public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp) {
return readStartpoint(ssp, null);
Comment thread
rmatharu-zz marked this conversation as resolved.
Map<String, byte[]> startpointBytes = readWriteStore.all();
// there is no task-name to use as key for the startpoint in this case (only the ssp), so we use a null task-name
return readStartpoint(startpointBytes, ssp, null);
}

/**
* Returns the last {@link Startpoint} that defines the start position for a {@link SystemStreamPartition} and {@link TaskName}.
* @param ssp The {@link SystemStreamPartition} to fetch the {@link Startpoint} for.
* @param taskName the {@link TaskName} to fetch the {@link Startpoint} for.
* @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition}.
* It is empty if it does not exist or if it is too stale.
*/
@VisibleForTesting
public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp, TaskName taskName) {
Map<String, byte[]> startpointBytes = readWriteStore.all();
return readStartpoint(startpointBytes, ssp, taskName);
}

/**
Expand All @@ -169,11 +185,11 @@ public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp) {
* @return {@link Optional} of {@link Startpoint} for the {@link SystemStreamPartition} and {@link TaskName}.
* It is empty if it does not exist or if it is too stale.
*/
public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp, TaskName taskName) {
public Optional<Startpoint> readStartpoint(Map<String, byte[]> startpointMap, SystemStreamPartition ssp, TaskName taskName) {
Preconditions.checkState(!stopped, "Underlying metadata store not available");
Preconditions.checkNotNull(ssp, "SystemStreamPartition cannot be null");

byte[] startpointBytes = readWriteStore.get(toReadWriteStoreKey(ssp, taskName));
byte[] startpointBytes = startpointMap.get(toReadWriteStoreKey(ssp, taskName));

if (ArrayUtils.isNotEmpty(startpointBytes)) {
try {
Expand All @@ -191,6 +207,7 @@ public Optional<Startpoint> readStartpoint(SystemStreamPartition ssp, TaskName t
return Optional.empty();
}


/**
* Deletes the {@link Startpoint} for a {@link SystemStreamPartition}
* @param ssp The {@link SystemStreamPartition} to delete the {@link Startpoint} for.
Expand Down Expand Up @@ -240,17 +257,19 @@ public Map<TaskName, Map<SystemStreamPartition, Startpoint>> fanOut(Map<TaskName
Instant now = Instant.now();
HashMultimap<SystemStreamPartition, TaskName> deleteKeys = HashMultimap.create();
HashMap<TaskName, StartpointFanOutPerTask> fanOuts = new HashMap<>();
Map<String, byte[]> startpointMap = readWriteStore.all();

for (TaskName taskName : taskToSSPs.keySet()) {
Set<SystemStreamPartition> ssps = taskToSSPs.get(taskName);
if (CollectionUtils.isEmpty(ssps)) {
LOG.warn("No SSPs are mapped to taskName: {}", taskName.getTaskName());
continue;
}
for (SystemStreamPartition ssp : ssps) {
Optional<Startpoint> startpoint = readStartpoint(ssp); // Read SSP-only key
Optional<Startpoint> startpoint = readStartpoint(startpointMap, ssp, null); // Read SSP-only key
startpoint.ifPresent(sp -> deleteKeys.put(ssp, null));

Optional<Startpoint> startpointForTask = readStartpoint(ssp, taskName); // Read SSP+taskName key
Optional<Startpoint> startpointForTask = readStartpoint(startpointMap, ssp, taskName); // Read SSP+taskName key
startpointForTask.ifPresent(sp -> deleteKeys.put(ssp, taskName));

Optional<Startpoint> startpointWithPrecedence = resolveStartpointPrecendence(startpoint, startpointForTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,13 +326,15 @@ void loadMetadataResources(JobModel jobModel) {
}
configStore.flush();

// fan out the startpoints
StartpointManager startpointManager = createStartpointManager();
startpointManager.start();
try {
startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
} finally {
startpointManager.stop();
if (new JobConfig(config).getStartpointEnabled()) {
// fan out the startpoints
StartpointManager startpointManager = createStartpointManager();
startpointManager.start();
try {
startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel));
} finally {
startpointManager.stop();
}
}
} else {
LOG.warn("No metadata store registered to this job coordinator. Config not written to the metadata store and no Startpoints fan out.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
val metadataResourceUtil = new MetadataResourceUtil(jobModel, metricsRegistry, config)
metadataResourceUtil.createResources()

// fan out the startpoints
val startpointManager = new StartpointManager(coordinatorStreamStore)
startpointManager.start()
try {
startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
} finally {
startpointManager.stop()
if (new JobConfig(config).getStartpointEnabled()) {
// fan out the startpoints
val startpointManager = new StartpointManager(coordinatorStreamStore)
startpointManager.start()
try {
startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
} finally {
startpointManager.stop()
}
}

val taskConfig = new TaskConfig(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
val metadataResourceUtil = new MetadataResourceUtil(jobModel, metricsRegistry, config)
metadataResourceUtil.createResources()

// fan out the startpoints
val startpointManager = new StartpointManager(coordinatorStreamStore)
startpointManager.start()
try {
startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
} finally {
startpointManager.stop()
if (new JobConfig(config).getStartpointEnabled()) {
// fan out the startpoints
val startpointManager = new StartpointManager(coordinatorStreamStore)
startpointManager.start()
try {
startpointManager.fanOut(JobModelUtil.getTaskToSystemStreamPartitions(jobModel))
} finally {
startpointManager.stop()
}
}

val containerId = "0"
Expand Down