Skip to content

Commit

Permalink
SAMZA-1572: Add fixed retries on failure in KafkaCheckpointManager.
Browse files Browse the repository at this point in the history
  • Loading branch information
shanthoosh committed Feb 8, 2018
1 parent 2e04e17 commit d88ccb1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
Expand Up @@ -53,6 +53,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde,
checkpointKeySerde: Serde[KafkaCheckpointLogKey] = new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging {

var MAX_RETRIES_ON_FAILURE = 50

info(s"Creating KafkaCheckpointManager for checkpointTopic:$checkpointTopic, systemName:$checkpointSystem " +
s"validateCheckpoints:$validateCheckpoint")

Expand Down Expand Up @@ -157,7 +159,12 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
},

(exception, loop) => {
warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception)
if (loop.sleepCount >= MAX_RETRIES_ON_FAILURE) {
error(s"Exhausted $MAX_RETRIES_ON_FAILURE retries when writing checkpoint: $checkpoint for task: $taskName.")
throw new SamzaException(s"Exception when writing checkpoint: $checkpoint for task: $taskName.", exception)
} else {
warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception)
}
}
)
}
Expand Down
Expand Up @@ -33,13 +33,15 @@ import org.apache.samza.checkpoint.Checkpoint
import org.apache.samza.config._
import org.apache.samza.container.TaskName
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.serializers.CheckpointSerde
import org.apache.samza.system._
import org.apache.samza.system.kafka.{KafkaStreamSpec, KafkaSystemFactory}
import org.apache.samza.util.{KafkaUtilException, NoOpMetricsRegistry, Util}
import org.apache.samza.{Partition, SamzaException}
import org.junit.Assert._
import org.junit._
import org.mockito.Mockito

class TestKafkaCheckpointManager extends KafkaServerTestHarness {

Expand Down Expand Up @@ -96,6 +98,29 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName))
}

@Test(expected = classOf[SamzaException])
def testWriteCheckpointShouldRetryFiniteTimesOnFailure: Unit = {
val checkpointTopic = "checkpoint-topic-2"
val mockKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])

class MockSystemFactory extends KafkaSystemFactory {
override def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
mockKafkaProducer
}
}

Mockito.doThrow(new RuntimeException()).when(mockKafkaProducer).flush(taskName.getTaskName)

val props = new org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties()
val spec = new KafkaStreamSpec("id", checkpointTopic, checkpointSystemName, 1, 1, false, props)
val checkPointManager = new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry)
checkPointManager.MAX_RETRIES_ON_FAILURE = 1

checkPointManager.register(taskName)
checkPointManager.start
checkPointManager.writeCheckpoint(taskName, new Checkpoint(ImmutableMap.of()))
}

@Test
def testFailOnTopicValidation {
// By default, should fail if there is a topic validation error
Expand Down

0 comments on commit d88ccb1

Please sign in to comment.