Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2493: Keep checkpoint manager consumer open for repeated polling #1327

Merged
merged 5 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public class TaskConfig extends MapConfig {
private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
private static final String BROADCAST_STREAM_RANGE_PATTERN = "^\\[[\\d]+\\-[\\d]+\\]$";
public static final String CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory";
public static final String CHECKPOINT_MANAGER_CONSUMER_STOP_AFTER_FIRST_READ = "task.checkpoint.consumer.stop.after.first.read";
bkonold marked this conversation as resolved.
Show resolved Hide resolved

public static final String TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = "task.transactional.state.checkpoint.enabled";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true;
Expand Down Expand Up @@ -213,6 +214,14 @@ public Optional<CheckpointManager> getCheckpointManager(MetricsRegistry metricsR
CheckpointManagerFactory.class).getCheckpointManager(this, metricsRegistry));
}

/**
* Internal config to indicate whether the SystemConsumer underlying a CheckpointManager should be left open after
* initial read of checkpoints.
*/
public boolean getCheckpointManagerConsumerStopAfterFirstRead() {
return getBoolean(CHECKPOINT_MANAGER_CONSUMER_STOP_AFTER_FIRST_READ, true);
}

/**
* Get the systemStreamPartitions of the broadcast stream. Specifying
* one partition for one stream or a range of the partitions for one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import org.apache.samza.task._
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util.{Util, _}
import org.apache.samza.SamzaException
import org.apache.samza.clustermanager.StandbyTaskUtil

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -133,6 +134,12 @@ object SamzaContainer extends Logging {
startpointManager: StartpointManager = null,
diagnosticsManager: Option[DiagnosticsManager] = Option.empty) = {
val config = jobContext.getConfig
if (StandbyTaskUtil.isStandbyContainer(containerId)) {
// standby containers will need to continually poll checkpoint messages
val newConfig = new util.HashMap[String, String]()
newConfig.putAll(config)
newConfig.put(TaskConfig.CHECKPOINT_MANAGER_CONSUMER_STOP_AFTER_FIRST_READ, java.lang.Boolean.FALSE.toString)
}
val jobConfig = new JobConfig(config)
val systemConfig = new SystemConfig(config)
val containerModel = jobModel.getContainers.get(containerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicReference
import com.google.common.annotations.VisibleForTesting
import com.google.common.base.Preconditions
import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
import org.apache.samza.config.{Config, JobConfig}
import org.apache.samza.config.{Config, JobConfig, TaskConfig}
import org.apache.samza.container.TaskName
import org.apache.samza.serializers.Serde
import org.apache.samza.metrics.MetricsRegistry
Expand Down Expand Up @@ -76,6 +76,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
val producerRef: AtomicReference[SystemProducer] = new AtomicReference[SystemProducer](getSystemProducer())
val producerCreationLock: Object = new Object

val stopConsumerAfterFirstRead: Boolean = new TaskConfig(config).getCheckpointManagerConsumerStopAfterFirstRead
bkonold marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create checkpoint stream prior to start.
*/
Expand Down Expand Up @@ -107,7 +109,6 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
info(s"Starting the checkpoint SystemConsumer from oldest offset $oldestOffset")
systemConsumer.register(checkpointSsp, oldestOffset)
systemConsumer.start()
// the consumer will be closed after first time reading the checkpoint
}

/**
Expand All @@ -132,9 +133,12 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
if (taskNamesToCheckpoints == null) {
info("Reading checkpoints for the first time")
taskNamesToCheckpoints = readCheckpoints()
// Stop the system consumer since we only need to read checkpoints once
info("Stopping system consumer.")
systemConsumer.stop()
if (stopConsumerAfterFirstRead) {
info("Stopping system consumer")
systemConsumer.stop()
}
} else if (!stopConsumerAfterFirstRead) {
taskNamesToCheckpoints ++= readCheckpoints()
}

val checkpoint: Checkpoint = taskNamesToCheckpoints.getOrElse(taskName, null)
Expand Down Expand Up @@ -220,6 +224,11 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
info ("Stopping system producer.")
producerRef.get().stop()

if (!stopConsumerAfterFirstRead) {
info("Stopping system consumer")
systemConsumer.stop()
}

info("CheckpointManager stopped.")
}

Expand Down