Skip to content

Commit

Permalink
FLUME-1541. Implement failover for LoadBalancingSinkProcessor.
Browse files Browse the repository at this point in the history
(Juhani Connolly via Mike Percy)
  • Loading branch information
mpercy committed Sep 6, 2012
1 parent 80699c4 commit d36861b
Show file tree
Hide file tree
Showing 4 changed files with 378 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,9 @@ public void setSinks(List<Sink> sinks) {
protected List<Sink> getSinks() {
return sinkList;
}

@Override
public void informSinkFailed(Sink failedSink) {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.flume.sink;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;

import org.apache.flume.Context;
Expand Down Expand Up @@ -78,13 +80,13 @@
* @see LoadBalancingSinkProcessor.SinkSelector
*/
public class LoadBalancingSinkProcessor extends AbstractSinkProcessor {

public static final String CONFIG_SELECTOR = "selector";
public static final String CONFIG_SELECTOR_PREFIX = CONFIG_SELECTOR + ".";

public static final String SELECTOR_NAME_ROUND_ROBIN = "ROUND_ROBIN";
public static final String SELECTOR_NAME_RANDOM = "RANDOM";

public static final String SELECTOR_NAME_ROUND_ROBIN_BACKOFF = "ROUND_ROBIN_BACKOFF";
public static final String SELECTOR_NAME_RANDOM_BACKOFF = "RANDOM_BACKOFF";

private static final Logger LOGGER = LoggerFactory
.getLogger(LoadBalancingSinkProcessor.class);
Expand All @@ -106,6 +108,10 @@ public void configure(Context context) {
selector = new RoundRobinSinkSelector();
} else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM)) {
selector = new RandomOrderSinkSelector();
} else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_ROUND_ROBIN_BACKOFF)) {
selector = new BackoffRoundRobinSinkSelector();
} else if (selectorTypeName.equalsIgnoreCase(SELECTOR_NAME_RANDOM_BACKOFF)) {
selector = new BackoffRandomOrderSinkSelector();
} else {
try {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -151,6 +157,7 @@ public Status process() throws EventDeliveryException {
status = sink.process();
break;
} catch (Exception ex) {
selector.informSinkFailed(sink);
LOGGER.warn("Sink failed to consume event. "
+ "Attempting next sink if available.", ex);
}
Expand Down Expand Up @@ -191,6 +198,8 @@ public interface SinkSelector extends Configurable, LifecycleAware {
void setSinks(List<Sink> sinks);

Iterator<Sink> createSinkIterator();

void informSinkFailed(Sink failedSink);
}

/**
Expand Down Expand Up @@ -248,4 +257,131 @@ public Iterator<Sink> createSinkIterator() {
return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
}
}

private static class FailureState {
long lastFail;
long restoreTime;
int sequentialFails;
}

public static abstract class AbstractBackoffSinkSelector extends AbstractSinkSelector {
// 2 ^ 16 seconds should be more than enough for an upper limit...
private static final int EXP_BACKOFF_COUNTER_LIMIT = 16;
private static final String CONF_MAX_TIMEOUT = "maxBackoffMillis";
private static final long CONSIDER_SEQUENTIAL_RANGE = 2000l;
private static final long MAX_TIMEOUT = 30000l;

protected List<FailureState> sinkStates;
protected Map<Sink, FailureState> stateMap;
protected long maxTimeout = MAX_TIMEOUT;

@Override
public void configure(Context context) {
super.configure(context);
maxTimeout = context.getLong(CONF_MAX_TIMEOUT, MAX_TIMEOUT);
}

@Override
public void setSinks(List<Sink> sinks) {
super.setSinks(sinks);
sinkStates = new ArrayList<FailureState>();
stateMap = new HashMap<Sink, FailureState>();
for(Sink sink : sinks) {
FailureState state = new FailureState();
sinkStates.add(state);
stateMap.put(sink, state);
}
}

@Override
public void informSinkFailed(Sink failedSink) {
super.informSinkFailed(failedSink);
FailureState state = stateMap.get(failedSink);
long now = System.currentTimeMillis();
long delta = now - state.lastFail;

long lastBackoffLength = Math.min(MAX_TIMEOUT, 1000 * (1 << state.sequentialFails));
long allowableDiff = lastBackoffLength + CONSIDER_SEQUENTIAL_RANGE;
if( allowableDiff > delta ) {
if(state.sequentialFails < EXP_BACKOFF_COUNTER_LIMIT)
state.sequentialFails++;
} else {
state.sequentialFails = 1;
}
state.lastFail = now;
state.restoreTime = now + Math.min(MAX_TIMEOUT, 1000 * (1 << state.sequentialFails));
}

}


private static class BackoffRoundRobinSinkSelector extends AbstractBackoffSinkSelector {
private int nextHead = 0;

@Override
public Iterator<Sink> createSinkIterator() {
long curTime = System.currentTimeMillis();
List<Integer> activeIndices = new ArrayList<Integer>();
int index = 0;
for(FailureState state : sinkStates) {
if (state.restoreTime < curTime) {
activeIndices.add(index);
}
index++;
}

int size = activeIndices.size();
// possible that the size has shrunk so gotta adjust nextHead for that
if(nextHead >= size) {
nextHead = 0;
}
int begin = nextHead++;
if (nextHead == activeIndices.size()) {
nextHead = 0;
}

int[] indexOrder = new int[size];

for (int i=0; i < size; i++) {
indexOrder[i] = activeIndices.get((begin + i) % size);
}

return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
}
}

/**
* A sink selector that implements a random sink selection policy. This
* implementation is not thread safe.
*/
private static class BackoffRandomOrderSinkSelector extends AbstractBackoffSinkSelector {
private Random random = new Random(System.currentTimeMillis());

@Override
public Iterator<Sink> createSinkIterator() {
long now = System.currentTimeMillis();

List<Integer> indexList = new ArrayList<Integer>();

int i = 0;
for (FailureState state : sinkStates) {
if(state.restoreTime < now)
indexList.add(i);
i++;
}

int size = indexList.size();
int[] indexOrder = new int[size];

while (indexList.size() != 1) {
int pick = random.nextInt(indexList.size());
indexOrder[indexList.size() - 1] = indexList.remove(pick);
}

indexOrder[0] = indexList.get(0);

return new SpecificOrderIterator<Sink>(indexOrder, getSinks());
}
}

}
Loading

0 comments on commit d36861b

Please sign in to comment.