Skip to content

Commit

Permalink
Move StreamRouterEngine#matchWithTimeOut() into StreamRouterEngine.Rule
Browse files Browse the repository at this point in the history
  • Loading branch information
Jochen Schalanda committed Jul 17, 2015
1 parent cc9a113 commit 02a18ea
Showing 1 changed file with 22 additions and 21 deletions.
Expand Up @@ -163,7 +163,7 @@ public List<Stream> match(Message message) {
if (streamRuleType != StreamRuleType.REGEX) {
stream = rule.match(message);
} else {
stream = matchWithTimeOut(message, rule);
stream = rule.matchWithTimeOut(message, streamProcessingTimeout, TimeUnit.MILLISECONDS);
}

final Stream.MatchingType matchingType = rule.getMatchingType();
Expand All @@ -189,26 +189,6 @@ public List<Stream> match(Message message) {
return ImmutableList.copyOf(result);
}

@Nullable
private Stream matchWithTimeOut(final Message message, final Rule rule) {
Stream matchedStream = null;
try (final Timer.Context ignored = streamMetrics.getExecutionTimer(rule.getStreamRule().getId()).time()) {
matchedStream = timeLimiter.callWithTimeout(new Callable<Stream>() {
@Override
public Stream call() throws Exception {
return rule.match(message);
}
}, streamProcessingTimeout, TimeUnit.MILLISECONDS, true);
} catch (UncheckedTimeoutException e) {
streamFaultManager.registerFailure(rule.getStream());
} catch (Exception e) {
LOG.warn("Unexpected error during stream matching: ", e);
streamMetrics.markExceptionMeter(rule.getStream().getId());
}

return matchedStream;
}

/**
* Returns a list of stream rule matches. Can be used to test streams and stream rule matches.
* This is meant for testing, do NOT use in production processing pipeline! (use {@link #match(org.graylog2.plugin.Message) match} instead)
Expand Down Expand Up @@ -273,6 +253,27 @@ public Stream match(Message message) {
}
}

@Nullable
private Stream matchWithTimeOut(final Message message, long timeout, TimeUnit unit) {
Stream matchedStream = null;
try (final Timer.Context ignored = streamMetrics.getExecutionTimer(rule.getId()).time()) {
matchedStream = timeLimiter.callWithTimeout(new Callable<Stream>() {
@Override
@Nullable
public Stream call() throws Exception {
return match(message);
}
}, timeout, unit, true);
} catch (UncheckedTimeoutException e) {
streamFaultManager.registerFailure(stream);
} catch (Exception e) {
LOG.warn("Unexpected error during stream matching", e);
streamMetrics.markExceptionMeter(rule.getStreamId());
}

return matchedStream;
}

public StreamRule getStreamRule() {
return rule;
}
Expand Down

0 comments on commit 02a18ea

Please sign in to comment.