Skip to content

Commit

Permalink
Improved queue-status logging
Browse files Browse the repository at this point in the history
Now displays queue fill level, waiting workers, active workers and hung
workers.
  • Loading branch information
hylkevds committed May 26, 2023
1 parent d3f5e75 commit 9805e08
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**New Features**
* Improved configuration options for logging.
* Implemented comparing to null: `$filter=property eq null` and `$filter=property ne null`.
* Improved queue-status logging.

**Internal changes & Bugfixes**
* Fixed performance degradation caused by HTTP-instances listening on the message bus.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package de.fraunhofer.iosb.ilt.frostserver.messagebus;

import static de.fraunhofer.iosb.ilt.frostserver.settings.CoreSettings.PREFIX_BUS;
import static de.fraunhofer.iosb.ilt.frostserver.util.ProcessorHelper.Processor.Status.WAITING;
import static de.fraunhofer.iosb.ilt.frostserver.util.ProcessorHelper.Processor.Status.WORKING;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -33,8 +35,12 @@
import de.fraunhofer.iosb.ilt.frostserver.settings.annotation.DefaultValueInt;
import de.fraunhofer.iosb.ilt.frostserver.util.ChangingStatusLogger;
import de.fraunhofer.iosb.ilt.frostserver.util.ProcessorHelper;
import de.fraunhofer.iosb.ilt.frostserver.util.ProcessorHelper.Processor;
import de.fraunhofer.iosb.ilt.frostserver.util.StringHelper;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -91,16 +97,21 @@ public class MqttMessageBus implements MessageBus, MqttCallback, ConfigDefaults
private int sendQueueSize;
private int recvPoolSize;
private int recvQueueSize;

private BlockingQueue<EntityChangedMessage> sendQueue;
private ExecutorService sendService;
private List<Processor<EntityChangedMessage>> sendProcessors = new ArrayList<>();

private BlockingQueue<EntityChangedMessage> recvQueue;
private ExecutorService recvService;
private List<Processor<EntityChangedMessage>> recvProcessors = new ArrayList<>();

private ScheduledExecutorService maintenanceTimer;
private final List<MessageListener> listeners = new CopyOnWriteArrayList<>();

private final ChangingStatusLogger statusLogger = new ChangingStatusLogger(LOGGER);
private final AtomicInteger sendQueueCount = new AtomicInteger();
private final LoggingStatus logStatus = new LoggingStatus();
private final LoggingStatus logStatus = new LoggingStatus(this::checkWorkers);

private String broker;
private final String clientId = "FROST-MQTT-Bus-" + UUID.randomUUID();
Expand All @@ -127,15 +138,16 @@ public void init(CoreSettings settings) {
sendPoolSize,
sendQueue,
this::handleMessageSent,
"mqtt-BusS");
logStatus.setSendQueueSize(sendQueueSize);
"mqtt-BusS",
sendProcessors);

recvQueue = new ArrayBlockingQueue<>(recvQueueSize);
recvService = ProcessorHelper.createProcessors(
recvPoolSize,
recvQueue,
this::handleMessageReceived,
"mqtt-BusR");
"mqtt-BusR",
recvProcessors);

broker = customSettings.get(TAG_MQTT_BROKER, getClass());
topicName = customSettings.get(TAG_TOPIC_NAME, getClass());
Expand Down Expand Up @@ -347,27 +359,116 @@ private void handleMessageReceived(EntityChangedMessage message) {
}
}

private void checkWorkers() {
int recvWaiting = 0;
int recvWorking = 0;
int recvBroken = 0;
int sendWaiting = 0;
int sendWorking = 0;
int sendBroken = 0;
Instant threshold = Instant.now().minus(2, ChronoUnit.SECONDS);
for (Processor<EntityChangedMessage> processor : recvProcessors) {
switch (processor.getStatus()) {
case WAITING:
recvWaiting++;
break;

case WORKING:
if (!processor.isFine(threshold)) {
recvBroken++;
} else {
recvWorking++;
}
break;

default:
LOGGER.trace("Worker not started.");
}
}
for (Processor<EntityChangedMessage> processor : sendProcessors) {
switch (processor.getStatus()) {
case WAITING:
sendWaiting++;
break;

case WORKING:
if (!processor.isFine(threshold)) {
sendBroken++;
} else {
sendWorking++;
}
break;

default:
LOGGER.trace("Worker not started.");
}
}
logStatus.setRecvWaiting(recvWaiting)
.setRecvWorking(recvWorking)
.setRecvBad(recvBroken)
.setSendWaiting(sendWaiting)
.setSendWorking(sendWorking)
.setSendBad(sendBroken);
}

private static class LoggingStatus extends ChangingStatusLogger.ChangingStatusDefault {

public static final String MESSAGE = "sendQueue: {} of {}";
public static final String MESSAGE = "RecvQueue: {} [{}, {}, {}] SendQueue: {} [{}, {}, {}] ";
public final Object[] status;
private final Runnable processor;

public LoggingStatus() {
super(MESSAGE, new Object[2]);
public LoggingStatus(Runnable processor) {
super(MESSAGE, new Object[8]);
status = getCurrentParams();
Arrays.setAll(status, (int i) -> 0);
this.processor = processor;
}

public LoggingStatus setSendQueueCount(Integer count) {
@Override
public void process() {
processor.run();
}

public LoggingStatus setRecvQueueCount(Integer count) {
status[0] = count;
return this;
}

public LoggingStatus setSendQueueSize(Integer size) {
public LoggingStatus setRecvWaiting(Integer size) {
status[1] = size;
return this;
}

public LoggingStatus setRecvWorking(Integer size) {
status[2] = size;
return this;
}

public LoggingStatus setRecvBad(Integer size) {
status[3] = size;
return this;
}

public LoggingStatus setSendQueueCount(Integer count) {
status[4] = count;
return this;
}

public LoggingStatus setSendWaiting(Integer size) {
status[5] = size;
return this;
}

public LoggingStatus setSendWorking(Integer size) {
status[6] = size;
return this;
}

public LoggingStatus setSendBad(Integer size) {
status[7] = size;
return this;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package de.fraunhofer.iosb.ilt.frostserver.mqtt;

import static de.fraunhofer.iosb.ilt.frostserver.util.ProcessorHelper.Processor.Status.WAITING;
import static de.fraunhofer.iosb.ilt.frostserver.util.ProcessorHelper.Processor.Status.WORKING;

import de.fraunhofer.iosb.ilt.frostserver.messagebus.MessageListener;
import de.fraunhofer.iosb.ilt.frostserver.model.EntityChangedMessage;
import de.fraunhofer.iosb.ilt.frostserver.model.EntityType;
Expand All @@ -42,9 +45,14 @@
import de.fraunhofer.iosb.ilt.frostserver.settings.UnknownVersionException;
import de.fraunhofer.iosb.ilt.frostserver.util.ChangingStatusLogger;
import de.fraunhofer.iosb.ilt.frostserver.util.ProcessorHelper;
import de.fraunhofer.iosb.ilt.frostserver.util.ProcessorHelper.Processor;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -71,14 +79,17 @@ public class MqttManager implements SubscriptionListener, MessageListener, Entit
private MqttServer server;
private BlockingQueue<EntityChangedMessage> entityChangedEventQueue;
private ExecutorService entityChangedExecutorService;
private final List<Processor<EntityChangedMessage>> entityChangedProcessors = new ArrayList<>();

private BlockingQueue<EntityCreateEvent> entityCreateEventQueue;
private ExecutorService entityCreateExecutorService;
private final List<Processor<EntityCreateEvent>> entityCreateProcessors = new ArrayList<>();

private final ChangingStatusLogger statusLogger = new ChangingStatusLogger(LOGGER);
private final AtomicInteger topicCount = new AtomicInteger();
private final AtomicInteger entityChangedQueueSize = new AtomicInteger();
private final AtomicInteger entityCreateQueueSize = new AtomicInteger();
private final LoggingStatus logStatus = new LoggingStatus();
private final LoggingStatus logStatus = new LoggingStatus(this::checkWorkers);

private boolean enabledMqtt = false;
private boolean shutdown = false;
Expand Down Expand Up @@ -109,14 +120,16 @@ private void init() {
mqttSettings.getSubscribeThreadPoolSize(),
entityChangedEventQueue,
this::handleEntityChangedEvent,
"Mqtt-EntityChangedProcessor");
"Mqtt-EntityChangedProcessor",
entityChangedProcessors);
// start watching for EntityCreateEvents
entityCreateEventQueue = new ArrayBlockingQueue<>(mqttSettings.getCreateMessageQueueSize());
entityCreateExecutorService = ProcessorHelper.createProcessors(
mqttSettings.getCreateThreadPoolSize(),
entityCreateEventQueue,
this::handleEntityCreateEvent,
"Mqtt-EntityCreateProcessor");
"Mqtt-EntityCreateProcessor",
entityCreateProcessors);
// start MQTT server
server = MqttServerFactory.getInstance().get(settings);
server.addSubscriptionListener(this);
Expand Down Expand Up @@ -264,6 +277,58 @@ public void onEntityCreate(EntityCreateEvent e) {
}
}

private void checkWorkers() {
int cngWaiting = 0;
int cngWorking = 0;
int cngBroken = 0;
int crtWaiting = 0;
int crtWorking = 0;
int crtBroken = 0;
Instant threshold = Instant.now().minus(2, ChronoUnit.SECONDS);
for (Processor<EntityChangedMessage> processor : entityChangedProcessors) {
switch (processor.getStatus()) {
case WAITING:
cngWaiting++;
break;

case WORKING:
if (!processor.isFine(threshold)) {
cngBroken++;
} else {
cngWorking++;
}
break;

default:
LOGGER.trace("Worker not started.");
}
}
for (Processor<EntityCreateEvent> processor : entityCreateProcessors) {
switch (processor.getStatus()) {
case WAITING:
crtWaiting++;
break;

case WORKING:
if (!processor.isFine(threshold)) {
crtBroken++;
} else {
crtWorking++;
}
break;

default:
LOGGER.trace("Worker not started.");
}
}
logStatus.setEntityChangedWaiting(cngWaiting)
.setEntityChangedWorking(cngWorking)
.setEntityChangedBad(cngBroken)
.setEntityCreateWaiting(crtWaiting)
.setEntityCreateWorking(crtWorking)
.setEntityCreateBad(crtBroken);
}

public static Version getVersionFromTopic(CoreSettings settings, String topic) throws UnknownVersionException {
int pos = topic.indexOf('/');
if (pos == -1) {
Expand All @@ -279,27 +344,64 @@ public static Version getVersionFromTopic(CoreSettings settings, String topic) t

private static class LoggingStatus extends ChangingStatusLogger.ChangingStatusDefault {

public static final String MESSAGE = "entityCreateQueue: {}, entityChangedQueue: {}, topics: {}";
public static final String MESSAGE = "entityCreateQueue: {} [{}, {}, {}] entityChangedQueue: {} [{}, {}, {}] topics: {}";
public final Object[] status;
private final Runnable processor;

public LoggingStatus() {
super(MESSAGE, new Object[3]);
public LoggingStatus(Runnable processor) {
super(MESSAGE, new Object[9]);
status = getCurrentParams();
Arrays.setAll(status, (int i) -> 0);
this.processor = processor;
}

@Override
public void process() {
processor.run();
}

public LoggingStatus setEntityCreateQueueSize(Integer size) {
status[0] = size;
return this;
}

public LoggingStatus setEntityChangedQueueSize(Integer size) {
public LoggingStatus setEntityCreateWaiting(Integer size) {
status[1] = size;
return this;
}

public LoggingStatus setEntityCreateWorking(Integer size) {
status[2] = size;
return this;
}

public LoggingStatus setEntityCreateBad(Integer size) {
status[3] = size;
return this;
}

public LoggingStatus setEntityChangedQueueSize(Integer size) {
status[4] = size;
return this;
}

public LoggingStatus setEntityChangedWaiting(Integer size) {
status[5] = size;
return this;
}

public LoggingStatus setEntityChangedWorking(Integer size) {
status[6] = size;
return this;
}

public LoggingStatus setEntityChangedBad(Integer size) {
status[7] = size;
return this;
}

public LoggingStatus setTopicCount(Integer count) {
status[2] = count;
status[8] = count;
return this;
}

Expand Down
Loading

0 comments on commit 9805e08

Please sign in to comment.