Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1572: Add fixed retries on failure in KafkaCheckpointManager. #420

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -53,6 +53,9 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
checkpointMsgSerde: Serde[Checkpoint] = new CheckpointSerde,
checkpointKeySerde: Serde[KafkaCheckpointLogKey] = new KafkaCheckpointLogKeySerde) extends CheckpointManager with Logging {

// Retry duration is approximately 83 minutes.
var MaxRetriesOnFailure = 50

info(s"Creating KafkaCheckpointManager for checkpointTopic:$checkpointTopic, systemName:$checkpointSystem " +
s"validateCheckpoints:$validateCheckpoint")

Expand Down Expand Up @@ -157,7 +160,12 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
},

(exception, loop) => {
warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception)
if (loop.sleepCount >= MaxRetriesOnFailure) {
error(s"Exhausted $MaxRetriesOnFailure retries when writing checkpoint: $checkpoint for task: $taskName.")
throw new SamzaException(s"Exception when writing checkpoint: $checkpoint for task: $taskName.", exception)
} else {
warn(s"Retrying failed checkpoint write to key: $key, checkpoint: $checkpoint for task: $taskName", exception)
}
}
)
}
Expand Down
Expand Up @@ -33,13 +33,15 @@ import org.apache.samza.checkpoint.Checkpoint
import org.apache.samza.config._
import org.apache.samza.container.TaskName
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.serializers.CheckpointSerde
import org.apache.samza.system._
import org.apache.samza.system.kafka.{KafkaStreamSpec, KafkaSystemFactory}
import org.apache.samza.util.{KafkaUtilException, NoOpMetricsRegistry, Util}
import org.apache.samza.{Partition, SamzaException}
import org.junit.Assert._
import org.junit._
import org.mockito.Mockito

class TestKafkaCheckpointManager extends KafkaServerTestHarness {

Expand Down Expand Up @@ -96,6 +98,29 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName))
}

@Test(expected = classOf[SamzaException])
def testWriteCheckpointShouldRetryFiniteTimesOnFailure: Unit = {
val checkpointTopic = "checkpoint-topic-2"
val mockKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])

class MockSystemFactory extends KafkaSystemFactory {
override def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
mockKafkaProducer
}
}

Mockito.doThrow(new RuntimeException()).when(mockKafkaProducer).flush(taskName.getTaskName)

val props = new org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties()
val spec = new KafkaStreamSpec("id", checkpointTopic, checkpointSystemName, 1, 1, false, props)
val checkPointManager = new KafkaCheckpointManager(spec, new MockSystemFactory, false, config, new NoOpMetricsRegistry)
checkPointManager.MaxRetriesOnFailure = 1

checkPointManager.register(taskName)
checkPointManager.start
checkPointManager.writeCheckpoint(taskName, new Checkpoint(ImmutableMap.of()))
}

@Test
def testFailOnTopicValidation {
// By default, should fail if there is a topic validation error
Expand Down