Skip to content

Commit

Permalink
Unbreak input throttling by publishing throttle state again (#4849)
Browse files Browse the repository at this point in the history
This broke in PR #1948 where I refactored the cluster event handling and removed the throttle state publishing by accident.

Fixes #4321
Refs #1948

**Note:** This should be cherry-picked into 2.4 as well.
  • Loading branch information
bernd authored and kroepke committed Jun 18, 2018
1 parent 99fafab commit 060bd15
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
Expand Up @@ -20,6 +20,7 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.RatioGauge;
import com.github.joschi.jadconfig.util.Size;
import com.google.common.eventbus.EventBus;
import org.graylog2.notifications.Notification;
import org.graylog2.notifications.NotificationService;
import org.graylog2.plugin.GlobalMetricNames;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class ThrottleStateUpdaterThread extends Periodical {
private static final Logger log = LoggerFactory.getLogger(ThrottleStateUpdaterThread.class);
private final KafkaJournal journal;
private final ProcessBuffer processBuffer;
private final EventBus eventBus;
private final Size retentionSize;
private final NotificationService notificationService;
private final ServerStatus serverStatus;
Expand All @@ -67,11 +69,13 @@ public class ThrottleStateUpdaterThread extends Periodical {
@Inject
public ThrottleStateUpdaterThread(final Journal journal,
ProcessBuffer processBuffer,
EventBus eventBus,
NotificationService notificationService,
ServerStatus serverStatus,
MetricRegistry metricRegistry,
@Named("message_journal_max_size") Size retentionSize) {
this.processBuffer = processBuffer;
this.eventBus = eventBus;
this.retentionSize = retentionSize;
this.notificationService = notificationService;
this.serverStatus = serverStatus;
Expand Down Expand Up @@ -228,6 +232,9 @@ public void doRun() {
// the journal needs this to provide information to rest clients
journal.setThrottleState(throttleState);

// publish to interested parties
eventBus.post(throttleState);

// Abusing the current thread to send notifications from KafkaJournal in the graylog2-shared module
final double journalUtilizationPercentage = throttleState.journalSizeLimit > 0 ? (throttleState.journalSize * 100) / throttleState.journalSizeLimit : 0.0;

Expand Down
Expand Up @@ -26,7 +26,7 @@ public class DeadEventLoggingListener {

@Subscribe
public void handleDeadEvent(DeadEvent event) {
LOGGER.warn("Received unhandled event of type <{}> from event bus <{}>", event.getEvent().getClass().getCanonicalName(),
LOGGER.debug("Received unhandled event of type <{}> from event bus <{}>", event.getEvent().getClass().getCanonicalName(),
event.getSource().toString());
LOGGER.debug("Dead event contents: {}", event.getEvent());
}
Expand Down

0 comments on commit 060bd15

Please sign in to comment.