diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index 50d22b1b84..965209d7b3 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -53,6 +53,9 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde, checkpointKeySerde: Serde[KafkaCheckpointLogKey] = new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging { + // Retry duration is approximately 83 minutes. + var MaxRetriesOnFailure = 50 + info(s"Creating KafkaCheckpointManager for checkpointTopic:$checkpointTopic, systemName:$checkpointSystem " + s"validateCheckpoints:$validateCheckpoint") @@ -159,7 +162,12 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec, }, (exception, loop) => { - warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception) + if (loop.sleepCount >= MaxRetriesOnFailure) { + error(s"Exhausted $MaxRetriesOnFailure retries when writing checkpoint: $checkpoint for task: $taskName.") + throw new SamzaException(s"Exception when writing checkpoint: $checkpoint for task: $taskName.", exception) + } else { + warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception) + } } ) } diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index c4e57f728f..3761ea12d9 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -33,6 +33,7 @@ import org.apache.samza.checkpoint.Checkpoint import org.apache.samza.config._ import org.apache.samza.container.TaskName import org.apache.samza.container.grouper.stream.GroupByPartitionFactory +import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.serializers.CheckpointSerde import org.apache.samza.system._ import org.apache.samza.system.kafka.{KafkaStreamSpec, KafkaSystemFactory} @@ -40,6 +41,7 @@ import org.apache.samza.util.{KafkaUtilException, NoOpMetricsRegistry, Util} import org.apache.samza.{Partition, SamzaException} import org.junit.Assert._ import org.junit._ +import org.mockito.Mockito class TestKafkaCheckpointManager extends KafkaServerTestHarness { @@ -96,6 +98,29 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName)) } + @Test(expected = classOf[SamzaException]) + def testWriteCheckpointShouldRetryFiniteTimesOnFailure: Unit = { + val checkpointTopic = "checkpoint-topic-2" + val mockKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer]) + + class MockSystemFactory extends KafkaSystemFactory { + override def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = { + mockKafkaProducer + } + } + + Mockito.doThrow(new RuntimeException()).when(mockKafkaProducer).flush(taskName.getTaskName) + + val props = new org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties() + val spec = new KafkaStreamSpec("id", checkpointTopic, checkpointSystemName, 1, 1, false, props) + val checkPointManager = new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry) + checkPointManager.MaxRetriesOnFailure = 1 + + checkPointManager.register(taskName) + checkPointManager.start + checkPointManager.writeCheckpoint(taskName, new Checkpoint(ImmutableMap.of())) + } + @Test def testFailOnTopicValidation { // By default, should fail if there is a topic validation error