Skip to content

Commit

Permalink
SAMZA-1572: Add fixed retries on failure in KafkaCheckpointManager.
Browse files Browse the repository at this point in the history
KafkaCheckpointManager.writeCheckpoint currently goes into a infinite loop when an irrecoverable failure happens, this indefinitely blocks the commit phase (there by preventing processing). Added finite retries (50), which would retry for fixed time in case of failure before giving up.

Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com>

Reviewers: Prateek M<prateekm@cs.utexas.edu>

Closes apache#420 from shanthoosh/add_fixed_retries_in_kafka_checkpoint_manager
  • Loading branch information
shanthoosh authored and atoomula committed Mar 27, 2018
1 parent 15e434c commit 28191d8
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 1 deletion.
Expand Up @@ -53,6 +53,9 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde,
checkpointKeySerde: Serde[KafkaCheckpointLogKey] = new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging {

// Retry duration is approximately 83 minutes.
var MaxRetriesOnFailure = 50

info(s"Creating KafkaCheckpointManager for checkpointTopic:$checkpointTopic, systemName:$checkpointSystem " +
s"validateCheckpoints:$validateCheckpoint")

Expand Down Expand Up @@ -159,7 +162,12 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
},

(exception, loop) => {
warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception)
if (loop.sleepCount >= MaxRetriesOnFailure) {
error(s"Exhausted $MaxRetriesOnFailure retries when writing checkpoint: $checkpoint for task: $taskName.")
throw new SamzaException(s"Exception when writing checkpoint: $checkpoint for task: $taskName.", exception)
} else {
warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception)
}
}
)
}
Expand Down
Expand Up @@ -33,13 +33,15 @@ import org.apache.samza.checkpoint.Checkpoint
import org.apache.samza.config._
import org.apache.samza.container.TaskName
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.serializers.CheckpointSerde
import org.apache.samza.system._
import org.apache.samza.system.kafka.{KafkaStreamSpec, KafkaSystemFactory}
import org.apache.samza.util.{KafkaUtilException, NoOpMetricsRegistry, Util}
import org.apache.samza.{Partition, SamzaException}
import org.junit.Assert._
import org.junit._
import org.mockito.Mockito

class TestKafkaCheckpointManager extends KafkaServerTestHarness {

Expand Down Expand Up @@ -96,6 +98,29 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName))
}

@Test(expected = classOf[SamzaException])
def testWriteCheckpointShouldRetryFiniteTimesOnFailure: Unit = {
val checkpointTopic = "checkpoint-topic-2"
val mockKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])

class MockSystemFactory extends KafkaSystemFactory {
override def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
mockKafkaProducer
}
}

Mockito.doThrow(new RuntimeException()).when(mockKafkaProducer).flush(taskName.getTaskName)

val props = new org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties()
val spec = new KafkaStreamSpec("id", checkpointTopic, checkpointSystemName, 1, 1, false, props)
val checkPointManager = new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry)
checkPointManager.MaxRetriesOnFailure = 1

checkPointManager.register(taskName)
checkPointManager.start
checkPointManager.writeCheckpoint(taskName, new Checkpoint(ImmutableMap.of()))
}

@Test
def testFailOnTopicValidation {
// By default, should fail if there is a topic validation error
Expand Down

0 comments on commit 28191d8

Please sign in to comment.