Skip to content

Commit

Permalink
SAMZA-168; add exponential backoff to kafka system admin
Browse files Browse the repository at this point in the history
  • Loading branch information
ept authored and Chris Riccomini committed Mar 5, 2014
1 parent f574112 commit 5ae0285
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 114 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.samza.util

class ExponentialSleepStrategy(
backOffMultiplier: Double = 2.0,
initialDelayMs: Long = 100,
maximumDelayMs: Long = 10000) {

require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1")
require(initialDelayMs > 0, "initialDelayMs must be positive")
require(maximumDelayMs >= initialDelayMs, "maximumDelayMs must be >= initialDelayMs")

var previousDelay = 0L

def sleep() = {
val nextDelay = getNextDelay(previousDelay)
Thread.sleep(nextDelay)
previousDelay = nextDelay
}

def getNextDelay(previousDelay: Long): Long = {
val nextDelay = (previousDelay * backOffMultiplier).asInstanceOf[Long]
math.min(math.max(initialDelayMs, nextDelay), maximumDelayMs)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.samza.util

import org.junit.Assert._
import org.junit.Test
import org.apache.samza.util.ExponentialSleepStrategy

class TestExponentialSleepStrategy {

@Test def testGetNextDelayReturnsIncrementalDelay() = {
val st = new ExponentialSleepStrategy
var nextDelay = st.getNextDelay(0L)
assertEquals(nextDelay, 100L)
nextDelay = st.getNextDelay(nextDelay)
assertEquals(nextDelay, 200L)
nextDelay = st.getNextDelay(nextDelay)
assertEquals(nextDelay, 400L)
}

@Test def testGetNextDelayReturnsMaximumDelayWhenDelayCapReached() = {
val st = new ExponentialSleepStrategy
var nextDelay = st.getNextDelay(6400L)
assertEquals(nextDelay, 10000L)
nextDelay = st.getNextDelay(nextDelay)
assertEquals(nextDelay, 10000L)
}

@Test def testSleepStrategyIsConfigurable() = {
val st = new ExponentialSleepStrategy(backOffMultiplier = 3.0, initialDelayMs = 10)
var nextDelay = st.getNextDelay(0L)
assertEquals(nextDelay, 10L)
nextDelay = st.getNextDelay(nextDelay)
assertEquals(nextDelay, 30L)
nextDelay = st.getNextDelay(nextDelay)
assertEquals(nextDelay, 90L)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import java.util.Map.Entry
import scala.collection.mutable
import kafka.consumer.ConsumerConfig
import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
import org.apache.samza.util.ExponentialThreadSleepStrategy
import org.apache.samza.util.ExponentialSleepStrategy

/**
* Companion object for class JvmMetrics encapsulating various constants
Expand Down Expand Up @@ -125,7 +125,7 @@ class BrokerProxy(
val thread: Thread = new Thread(new Runnable() {
def run() {
info("Initialising sleep strategy");
val sleepStrategy = new ExponentialThreadSleepStrategy
val sleepStrategy = new ExponentialSleepStrategy
info("Starting thread for BrokerProxy")

while (!Thread.currentThread.isInterrupted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.samza.system.SystemAdmin
import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.util.ClientUtilTopicMetadataStore
import org.apache.samza.util.ExponentialSleepStrategy
import kafka.api._
import kafka.consumer.SimpleConsumer
import kafka.utils.Utils
Expand Down Expand Up @@ -122,6 +123,7 @@ class KafkaSystemAdmin(
var upcomingOffsets = Map[SystemStreamPartition, String]()
var done = false
var consumer: SimpleConsumer = null
val retryBackoff = new ExponentialSleepStrategy(initialDelayMs = 500)

debug("Fetching offsets for: %s" format streams)

Expand Down Expand Up @@ -176,6 +178,7 @@ class KafkaSystemAdmin(
// Retry.
warn("Unable to fetch last offsets for streams due to: %s, %s. Retrying. Turn on debugging to get a full stack trace." format (e.getMessage, streams))
debug(e)
retryBackoff.sleep
}
}

Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 5ae0285

Please sign in to comment.