Skip to content

Commit

Permalink
0004183: Network Diagnostic additions to Bandwidth tests, testing upload
Browse files Browse the repository at this point in the history
and download using different sized payloads
  • Loading branch information
philipmarzullo64 committed Dec 4, 2019
1 parent 669a23d commit aedd694
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 15 deletions.
Expand Up @@ -350,7 +350,7 @@ protected void init() {
this.parameterService.setExtensionService(extensionService);
this.contextService = new ContextService(parameterService, symmetricDialect);

this.bandwidthService = new BandwidthService(parameterService);
this.bandwidthService = new BandwidthService(this);
this.sequenceService = new SequenceService(parameterService, symmetricDialect);
this.stagingManager = createStagingManager();
this.nodeService = new NodeService(this);
Expand Down
Expand Up @@ -20,6 +20,11 @@
*/
package org.jumpmind.symmetric.service;

import java.io.IOException;
import java.util.List;

import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.impl.BandwidthService;

/**
* A client service that determines bandwidth availability.
Expand All @@ -31,4 +36,9 @@ public interface IBandwidthService {

public double getDownloadKbpsFor(String url, long sampleSize, long maxTestDuration);

public double getUploadKbpsFor(Node remoteNode, Node localNode, long sampleSize, long maxTestDuration) throws IOException;

public List<BandwidthService.BandwidthResults> diagnoseDownloadBandwidth(Node localNode, Node remoteNode);

public List<BandwidthService.BandwidthResults> diagnoseUploadBandwidth(Node localNode, Node remoteNode);
}
Expand Up @@ -20,26 +20,45 @@
*/
package org.jumpmind.symmetric.service.impl;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLink;
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.service.IBandwidthService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.transport.BandwidthTestResults;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
* @see IBandwidthService
*/
public class BandwidthService implements IBandwidthService {


public static final String Diagnostic_BandwidthFail = "Could not get Bandwidth";
public static final String Diagnostic_BandwidthFailure = "%s";

protected final Logger log = LoggerFactory.getLogger(getClass());

private ISymmetricEngine engine;

public BandwidthService(IParameterService parameterService) {
public BandwidthService(ISymmetricEngine engine) {
this.engine = engine;
}

public double getDownloadKbpsFor(String syncUrl, long sampleSize, long maxTestDuration) {
Expand All @@ -62,7 +81,7 @@ protected BandwidthTestResults getDownloadResultsFor(String syncUrl, long sample
InputStream is = null;
try {
BandwidthTestResults bw = new BandwidthTestResults();
URL u = new URL(String.format("%s/bandwidth?sampleSize=%s", syncUrl, sampleSize));
URL u = new URL(String.format("%s/bandwidth?direction=pull&sampleSize=%s", syncUrl, sampleSize));
bw.start();
HttpURLConnection conn = (HttpURLConnection) u.openConnection();

Expand All @@ -85,5 +104,184 @@ protected BandwidthTestResults getDownloadResultsFor(String syncUrl, long sample
}
}
}

public double getUploadKbpsFor(Node remoteNode, Node localNode, long sampleSize, long maxTestDuration) throws IOException {
double uploadSpeed = -1d;
try {
BandwidthTestResults bwtr = getUploadResultsFor(remoteNode, localNode, sampleSize, maxTestDuration);
uploadSpeed = bwtr.getKbps();
} catch(SocketTimeoutException e) {
log.error(e.getMessage(),e);
} catch(Exception e) {
log.error(e.getMessage(), e);
}

return uploadSpeed;
}

protected BandwidthTestResults getUploadResultsFor(Node remoteNode, Node localNode, long sampleSize, long maxTestDuration) throws IOException {
IOutgoingWithResponseTransport outgoing = null;
try {
Map<String, String> requestProperties = new HashMap<String, String>();
requestProperties.put("direction", "push");
NodeSecurity identitySecurity = engine.getNodeService().findNodeSecurity(localNode.getNodeId(), true);
outgoing =
engine.getTransportManager().getBandwidthPushTransport(
remoteNode, localNode, identitySecurity.getNodePassword(), requestProperties, engine.getParameterService().getRegistrationUrl());
long startTime = System.currentTimeMillis();
BufferedWriter writer = outgoing.openWriter();
String stringToWriter = "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789" +
"0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
for(long i = 0l; i < sampleSize;) {
writer.write(stringToWriter);
i += stringToWriter.length();
if(System.currentTimeMillis() - startTime > maxTestDuration) {
break;
}
}
String response = outgoing.readResponse().readLine();
ObjectMapper mapper = new ObjectMapper();
BandwidthTestResults results = mapper.readValue(response, BandwidthTestResults.class);
log.info("{} was calculated to have a upload bandwidth of {} kbps", remoteNode.getSyncUrl(), results.getKbps());
return results;
} finally {
outgoing.close();
}
}

public List<BandwidthService.BandwidthResults> diagnoseDownloadBandwidth(Node localNode, Node remoteNode) {
List<Long> downloadPayloadsList = new ArrayList<Long>();
List<BandwidthService.BandwidthResults> downloadBandwidthResultsList = new ArrayList<BandwidthService.BandwidthResults>();
String downloadPayloads = engine.getParameterService().getString("console.node.connection.diagnostic.download.bandwidth.payloads","");
if(downloadPayloads != null && downloadPayloads.length() > 0) {
for(String s : Arrays.asList(downloadPayloads.split(","))) {
downloadPayloadsList.add(Long.valueOf(s));
}
}

for(Long payload : downloadPayloadsList) {
BandwidthService.BandwidthResults bw = ((BandwidthService) engine.getBandwidthService()).new BandwidthResults();
bw.setPayloadSize(payload);
double dlSpeed = 0d;
if(isPullEnabled(localNode, remoteNode)) {
try {
dlSpeed = engine.getBandwidthService().getDownloadKbpsFor(remoteNode.getSyncUrl(), payload, 5000);
bw.setKbps(dlSpeed);
} catch(Exception e) {
bw.setFailure(true);
bw.setFailureMessage(Diagnostic_BandwidthFail);
bw.setException(e);
}
} else {
bw.setFailure(true);
bw.setFailureMessage("Pull is not enabled");
}
downloadBandwidthResultsList.add(bw);
}
return downloadBandwidthResultsList;
}

protected boolean isPullEnabled(Node localNode, Node remoteNode) {
List<NodeGroupLink> groupLinks = engine.getConfigurationService().getNodeGroupLinks(false);
for (NodeGroupLink link : groupLinks) {
if (link.getSourceNodeGroupId().equals(remoteNode.getNodeGroupId())
&& link.getTargetNodeGroupId().equals(localNode.getNodeGroupId())
&& link.getDataEventAction().getShortName().equals("pull")) {
return true;
}
}
return false;
}

public List<BandwidthService.BandwidthResults> diagnoseUploadBandwidth(Node localNode, Node remoteNode) {
List<Long> uploadPayloadsList = new ArrayList<Long>();
List<BandwidthService.BandwidthResults> uploadBandwidthResultsList = new ArrayList<BandwidthService.BandwidthResults>();
String uploadPayloads = engine.getParameterService().getString("console.node.connection.diagnostic.upload.bandwidth.payloads","");
if(uploadPayloads != null && uploadPayloads.length() > 0) {
for(String s : Arrays.asList(uploadPayloads.split(","))) {
uploadPayloadsList.add(Long.valueOf(s));
}
}

for(Long payload : uploadPayloadsList) {
BandwidthService.BandwidthResults bw = ((BandwidthService) engine.getBandwidthService()).new BandwidthResults();
bw.setPayloadSize(payload);
double dlSpeed = 0d;
if(isPushEnabled(localNode, remoteNode)) {
try {
dlSpeed = engine.getBandwidthService().getDownloadKbpsFor(remoteNode.getSyncUrl(), payload, 5000);
bw.setKbps(dlSpeed);
} catch(Exception e) {
bw.setFailure(true);
bw.setFailureMessage(BandwidthService.Diagnostic_BandwidthFail);
bw.setException(e);
}
} else {
bw.setFailure(true);
bw.setFailureMessage("Push is not enabled");
}
uploadBandwidthResultsList.add(bw);
}
return uploadBandwidthResultsList;
}

protected boolean isPushEnabled(Node localNode, Node remoteNode) {
List<NodeGroupLink> groupLinks = engine.getConfigurationService().getNodeGroupLinks(false);
for (NodeGroupLink link : groupLinks) {
if(link.getSourceNodeGroupId().equals(localNode.getNodeGroupId())
&& link.getTargetNodeGroupId().equals(remoteNode.getNodeGroupId())
&& link.getDataEventAction().getShortName().equals("push")) {
return true;
}
}
return false;
}


public class BandwidthResults {
private long payloadSize;
private double kbps;
private boolean failure = false;
private String failureMessage = null;
private Exception exception = null;
public Exception getException() {
return exception;
}
public void setException(Exception exception) {
this.exception = exception;
}
public boolean isFailure() {
return failure;
}
public void setFailure(boolean failure) {
this.failure = failure;
}
public String getFailureMessage() {
return failureMessage;
}
public void setFailureMessage(String failureMessage) {
this.failureMessage = failureMessage;
}
public long getPayloadSize() {
return payloadSize;
}
public void setPayloadSize(long payloadSize) {
this.payloadSize = payloadSize;
}
public double getKbps() {
return kbps;
}
public void setKbps(double kbps) {
this.kbps = kbps;
}
}

}
Expand Up @@ -50,7 +50,8 @@
public class BandwidthTestResults {
long start;
long total = 0;
long duration = 0;
long elapsed = 0;
double kbps;

/**
* Starts the bandwidth measurement.
Expand All @@ -73,7 +74,13 @@ public void transmitted(int n) {
* Ends the bandwidth measurement.
*/
public void stop() {
duration += System.currentTimeMillis() - start;
elapsed += System.currentTimeMillis() - start;
if (total == 0 || elapsed == 0) {
kbps = 0;
} else {
// convert to bits, then convert to kilobits, then divide by seconds
kbps = ((8.0d * total) / 1024.0d) / (elapsed / 1000.0d) ;
}
}

/**
Expand All @@ -82,11 +89,13 @@ public void stop() {
* @return Bandwidth in Kbps.
*/
public double getKbps() {
if (total == 0 || duration == 0) {
return 0;
} else {
return (8.0d * 1000.0d * total / 1024.0d) / duration;
}
// if (total == 0 || elapsed == 0) {
// return 0;
// } else {
// // convert to bits, then convert to kilobits, then divide by seconds
// return ((8.0d * total) / 1024.0d) / (elapsed / 1000.0d) ;
// }
return kbps;
}

/**
Expand All @@ -95,7 +104,16 @@ public double getKbps() {
* @return Number of milliseconds elapsed between start() and stop().
*/
public long getElapsed() {
return duration;
return elapsed;
}

/**
* Returns the number of bytes sent between start() and stop().
*
* @return Number of bytes sent between start() and stop().
*/
public long getTotal() {
return total;
}

}
Expand Up @@ -56,6 +56,9 @@ public IOutgoingWithResponseTransport getFilePushTransport(Node remote, Node loc

public IIncomingTransport getConfigTransport(Node remote, Node local, String securityToken,
String symmetricVersion, String configVersion, String registrationUrl) throws IOException;

public IOutgoingWithResponseTransport getBandwidthPushTransport(Node remote, Node local, String securityToken,
Map<String, String> requestProperties, String registrationUrl) throws IOException;

/**
* This is the proper way to determine the URL for a node. It delegates to configured
Expand Down
Expand Up @@ -302,6 +302,16 @@ public IIncomingTransport getRegisterTransport(Node node, String registrationUrl
return new HttpIncomingTransport(this, createGetConnectionFor(new URL(buildRegistrationUrl(
registrationUrl, node))), engine.getParameterService());
}

@Override
public IOutgoingWithResponseTransport getBandwidthPushTransport(Node remote, Node local, String securityToken,
Map<String, String> requestProperties, String registrationUrl) throws IOException
{
URL url = new URL(resolveURL(remote.getSyncUrl(), registrationUrl) + "/" + "bandwidth?direction=push");
return new HttpOutgoingTransport(this, url, getHttpTimeOutInMs(), isUseCompression(remote),
getCompressionStrategy(), getCompressionLevel(), local.getNodeId(),
securityToken, isOutputStreamEnabled(), getOutputStreamSize(), false, requestProperties);
}

public static String buildRegistrationUrl(String baseUrl, Node node) throws IOException {
if (baseUrl == null) {
Expand Down
Expand Up @@ -264,5 +264,10 @@ public IIncomingTransport getConfigTransport(Node remote, Node local, String sec
return null;
}


@Override
public IOutgoingWithResponseTransport getBandwidthPushTransport(Node remote, Node local, String securityToken, Map<String, String> requestProperties, String registrationUrl)
throws IOException {
return null;
}

}
Expand Up @@ -134,4 +134,10 @@ public IIncomingTransport getConfigTransport(Node remote, Node local, String sec
return null;
}

@Override
public IOutgoingWithResponseTransport getBandwidthPushTransport(Node remote, Node local, String securityToken, Map<String, String> requestProperties, String registrationUrl)
throws IOException {
return null;
}

}

0 comments on commit aedd694

Please sign in to comment.