Skip to content

Commit

Permalink
Suppress display of sender/receiver counters when channel is IPC
Browse files Browse the repository at this point in the history
  • Loading branch information
epickrram committed May 28, 2019
1 parent 42092b3 commit fadc5d4
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 25 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -60,5 +60,5 @@ Aether-Net can be launched in local mode:
$ java -cp /path/to/aether-net-all.jar \
-Daether.monitoringLocations=A:/path/to/A/media-driver;B:/path/to/B/media-driver \
-Daether.net.mode=LOCAL \
com.aitusoftware.aether.net.Server /path/to/aether-net.properties
com.aitusoftware.aether.net.Server
```
27 changes: 19 additions & 8 deletions src/main/resources/monitor.js
Expand Up @@ -67,6 +67,7 @@ function renderStreamData(allData) {
var streamData = allData['streams'];
for (var channel in streamData) {
for (var streamId in streamData[channel]) {
var isIpcChannel = channel.indexOf('aeron:ipc') >= 0;
html += '<div class="channel row bottom-bar top-bar"><div class="stream col-md-12">' + channel + ' / ' + streamId + '</div></div>';
var publisherSet = streamData[channel][streamId];
for (var i = 0; i < publisherSet.length; i++) {
Expand All @@ -77,21 +78,31 @@ function renderStreamData(allData) {
html += pubStatRow('Session', publisher.sessionId, '', false);
html += pubStatRow('Publisher Position', publisher.publisherPosition, '', false);
html += pubStatRow('Publisher Limit', publisher.publisherLimit, '', false);
html += pubStatRow('Sender Position', publisher.senderPosition, '', false);
html += pubStatRow('Sender Limit', publisher.senderLimit, '', false);
html += pubStatRow('Back Pressure', publisher.backPressureEvents, bpeCls, false);
html += pubStatRow('Queued', publisher.sendBacklog, backlogCls, false);
html += pubStatRow('Remaining Buffer', publisher.remainingBuffer, '', true);
if (!isIpcChannel) {
html += pubStatRow('Sender Position', publisher.senderPosition, '', false);
html += pubStatRow('Sender Limit', publisher.senderLimit, '', false);
html += pubStatRow('Queued', publisher.sendBacklog, backlogCls, false);
}
html += pubStatRow('Remaining Buffer', publisher.remainingBuffer, '', false);
html += pubStatRow('Back Pressure', publisher.backPressureEvents, bpeCls, true);
var subscriberSet = publisher.subscribers;

for (var j = 0; j < subscriberSet.length; j++) {
var subscriber = subscriberSet[j];
html += '<div class="row"><div class="col-md-12"></div></div>';
html += subStatRow('Context', subscriber.label, '', false);
html += subStatRow('Receiver Position', subscriber.receiverPosition, '', false);
html += subStatRow('Receiver HWM', subscriber.receiverHighWaterMark, '', true);
if (!isIpcChannel) {
html += subStatRow('Receiver Position', subscriber.receiverPosition, '', false);
html += subStatRow('Receiver HWM', subscriber.receiverHighWaterMark, '', true);
}
for (var reg in subscriber.subscriberPositions) {
var available = (subscriber.receiverPosition - subscriber.subscriberPositions[reg]);
var available = 0;
if (isIpcChannel) {
available = Math.max(0, publisher.publisherPosition - subscriber.subscriberPositions[reg]);
} else {
available = Math.max(0, subscriber.receiverPosition - subscriber.subscriberPositions[reg]);
}

var cls = available > 0 ? 'sub-data-highlight' : '';
html += subStatRow('Subscriber Position', subscriber.subscriberPositions[reg], '', false);
html += subStatRow('Bytes Available', available, cls, true);
Expand Down
28 changes: 13 additions & 15 deletions src/test/java/com/aitusoftware/aether/net/IntegrationTest.java
Expand Up @@ -17,8 +17,17 @@
*/
package com.aitusoftware.aether.net;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import com.aitusoftware.aether.Aether;
import com.aitusoftware.aether.transport.CounterSnapshotPublisher;
import com.google.gson.Gson;
import io.aeron.Aeron;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import org.agrona.CloseHelper;
import org.agrona.concurrent.SleepingMillisIdleStrategy;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -34,19 +43,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import com.aitusoftware.aether.Aether;
import com.aitusoftware.aether.transport.CounterSnapshotPublisher;
import com.google.gson.Gson;

import org.agrona.CloseHelper;
import org.agrona.concurrent.SleepingMillisIdleStrategy;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.aeron.Aeron;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.jupiter.api.Assertions.fail;

class IntegrationTest
{
Expand Down
8 changes: 7 additions & 1 deletion src/test/java/com/aitusoftware/aether/net/StubSystem.java
Expand Up @@ -39,6 +39,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

import static io.aeron.CommonContext.IPC_CHANNEL;

public final class StubSystem implements AutoCloseable
{
static final String CHANNEL_A = "aeron:udp?endpoint=localhost:54567";
Expand Down Expand Up @@ -92,13 +94,15 @@ public void run()
Publication pubA = clientAeron.addPublication(CHANNEL_A, STREAM_ID);
Publication pubB = clientAeron.addPublication(CHANNEL_B, STREAM_ID);
Publication pubC = clientAeron.addPublication(CHANNEL_C, STREAM_ID);
Publication ipcPub = clientAeron.addPublication(IPC_CHANNEL, STREAM_ID);
Subscription subA_0 = serverAeron.addSubscription(CHANNEL_A, STREAM_ID);
Subscription subA_1 = serverAeron.addSubscription(CHANNEL_A, STREAM_ID);
Subscription subA_2 = serverAeron.addSubscription(CHANNEL_A, STREAM_ID);
Subscription subB_0 = serverAeron.addSubscription(CHANNEL_B, STREAM_ID);
Subscription subB_1 = serverAeron.addSubscription(CHANNEL_B, STREAM_ID);
Subscription subC_0 = serverAeron.addSubscription(CHANNEL_C, STREAM_ID);
Subscription subC_1 = serverAeron.addSubscription(CHANNEL_C, STREAM_ID))
Subscription subC_1 = serverAeron.addSubscription(CHANNEL_C, STREAM_ID);
Subscription ipcSub = clientAeron.addSubscription(IPC_CHANNEL, STREAM_ID))
{
final Random random = ThreadLocalRandom.current();
final DirectBuffer payload = new UnsafeBuffer(PAYLOAD);
Expand All @@ -116,6 +120,7 @@ public void run()
if (rnd >= 2)
{
pubC.offer(payload, 0, PAYLOAD.length);
ipcPub.offer(payload, 0, PAYLOAD.length);
}
if (rnd >= 3)
{
Expand All @@ -140,6 +145,7 @@ public void run()
if (rnd >= 7)
{
subB_1.poll(assembler, random.nextInt(POLL_LIMIT));
ipcSub.poll(assembler, random.nextInt(POLL_LIMIT));
}
if (rnd >= 8)
{
Expand Down

0 comments on commit fadc5d4

Please sign in to comment.