Skip to content

Commit

Permalink
add #12 未処理のメッセージ数と1メッセージの処理時間を送信するPingSenderの実装
Browse files Browse the repository at this point in the history
  • Loading branch information
TodorokiKohei committed Jul 11, 2023
1 parent d933c54 commit 59aaab9
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public static void main(String[] args) {
mt.waitForCompletion();
System.out.println("Connected");

Thread.sleep(1000);
for (int i = 0; i < 10; i++) {
Thread.sleep(5000);
for (int i = 0; i < 20; i++) {
MqttMessage ms = new MqttMessage("Hello World".getBytes());
ms.setQos(1);
client.publish("t", ms);
Thread.sleep(1000);
Thread.sleep(500);
}

System.out.println("Wait: please enter");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.eclipse.paho.sample.mqttv5app;


import com.fasterxml.jackson.core.JsonProcessingException;
import org.eclipse.paho.mqttv5.client.ExtendedPingSender;
import org.eclipse.paho.mqttv5.client.logging.Logger;
import org.eclipse.paho.mqttv5.client.logging.LoggerFactory;
import org.eclipse.paho.mqttv5.common.packet.MqttPingReq;

import java.util.ArrayList;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;

public class StatusPingSender extends ExtendedPingSender {
private static final String CLASS_NAME = StatusPingSender.class.getName();

private int bufferSize;
private ArrayList<Double> processingTimeBuffer;
private double processingTimePerMsg;

private ObjectMapper mapper;

public StatusPingSender(int bufferSize) {
this.bufferSize = bufferSize;
processingTimeBuffer = new ArrayList<>(bufferSize);
processingTimePerMsg = 0;
mapper = new ObjectMapper();
}

public void updateProcessingTimePerMsg(double processingTime){
final String methodName = "updateProcessingTimePerMsg";

double totalProcessingTime = processingTimePerMsg * processingTimeBuffer.size();
if (processingTimeBuffer.size() == bufferSize) {
double element = processingTimeBuffer.remove(0);
totalProcessingTime -= element;
log.info(CLASS_NAME, methodName, "Buffer is full so removed first element({0}).", new Object[]{element});
}
totalProcessingTime += processingTime;
processingTimeBuffer.add(new Double(processingTime));
log.info(CLASS_NAME, methodName, "Added {0} to the end of buffer.", new Object[]{processingTime});

processingTimePerMsg = totalProcessingTime / processingTimeBuffer.size();
}

@Override
protected MqttPingReq createPingreq() {
final String methodName = "cratePingreq";

Payload payload = new Payload(comms.getNumberOfMsgsUnprocessed(), processingTimePerMsg);
try {
String jsonPayload = mapper.writeValueAsString(payload);
MqttPingReq pingReq = new MqttPingReq(jsonPayload.getBytes());
log.info(CLASS_NAME, methodName, "Create PINGREQ payload: {0}.", new Object[]{jsonPayload});
return pingReq;
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

class Payload {
@JsonProperty("numberOfMsgsInQueue")
public int numberOfMsgsInQueue;

@JsonProperty("processingTimerPerMsg")
public double processingTimerPerMsg;

public Payload(int numberOfMsgsInQueue, double processingTimerPerMsg){
this.numberOfMsgsInQueue = numberOfMsgsInQueue;
this.processingTimerPerMsg = processingTimerPerMsg;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.io.InputStreamReader;

public class Subscriber implements MqttCallback {
static private StatusPingSender pingSender;

public static void main(String[] args) {

String url = "tcp://localhost:1883";
Expand All @@ -20,12 +22,15 @@ public static void main(String[] args) {
MqttConnectionOptions conOpts = new MqttConnectionOptions();
conOpts.setKeepAliveInterval(2);

ExtendedPingSender pingSender = new SamplePingSender("Hello World!!");
// ExtendedPingSender pingSender = new SamplePingSender("Hello World!!");
pingSender = new StatusPingSender(100);
pingSender.setPingIntervalMilliSeconds(1000);

MqttAsyncClient client = null;
try {
client = new MqttAsyncClient(url, clientId, persistence, pingSender, null);
client.resizeReceiverQueueSize(100);

Subscriber sub = new Subscriber();
client.setCallback(sub);

Expand Down Expand Up @@ -62,7 +67,11 @@ public void mqttErrorOccurred(MqttException exception) {

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Message Arrived");
long startTime = System.currentTimeMillis();
Thread.sleep(800);
long processingTime = System.currentTimeMillis() - startTime;
pingSender.updateProcessingTimePerMsg(processingTime / 1000.0);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ org.eclipse.paho.mqttv5.client.level=INFO
# It is possible to set more granular trace on a per class basis e.g.
#org.eclipse.paho.mqttv5.client.internal.ClientComms.level=ALL

org.eclipse.paho.sample.mqttv5app.handlers=java.util.logging.MemoryHandler
org.eclipse.paho.sample.mqttv5app.level=INFO

# Handlers
# -----------------------------------------
# Note: the target handler that is associated with the MemoryHandler is not a root handler
Expand Down

0 comments on commit 59aaab9

Please sign in to comment.