Skip to content
Permalink
Browse files

Merge pull request #2507 from freimair/monitor

Monitor tweaks, fixes and a new Metric
  • Loading branch information...
ManfredKarrer committed Mar 7, 2019
2 parents dca9eff + 35914b8 commit 80fa5c029fffa9ea2e49d58bfc0a4e383cd26fa6
@@ -334,7 +334,6 @@ configure(project(':monitor')) {
}

dependencies {
compile project(':p2p')
compile project(':core')
compile "org.slf4j:slf4j-api:$slf4jVersion"
compile "ch.qos.logback:logback-core:$logbackVersion"
@@ -344,10 +343,6 @@ configure(project(':monitor')) {
compileOnly "org.projectlombok:lombok:$lombokVersion"
annotationProcessor "org.projectlombok:lombok:$lombokVersion"

compile('com.github.JesusMcCloud.netlayer:tor.native:0.6.2') {
exclude(module: 'slf4j-api')
}

testCompile 'org.junit.jupiter:junit-jupiter-api:5.3.2'
testCompile 'org.junit.jupiter:junit-jupiter-params:5.3.2'
testCompileOnly "org.projectlombok:lombok:$lombokVersion"
@@ -93,6 +93,10 @@ PriceNodeStats.enabled=true
PriceNodeStats.run.interval=42
PriceNodeStats.run.hosts=http://5bmpx76qllutpcyp.onion, http://xc3nh4juf2hshy7e.onion, http://44mgyoe2b6oqiytt.onion, http://62nvujg5iou3vu3i.onion, http://ceaanhbvluug4we6.onion
#MarketStats Metric
MarketStats.enabled=true
MarketStats.run.interval=191
## Reporters are configured via a set of properties as well.
##
## In contrast to Metrics, Reporters do not have a minimal set of properties.
@@ -3,10 +3,14 @@ Description=Bisq network monitor
After=network.target

[Service]
Type=exec
WorkingDirectory=~
Environment="JAVA_OPTS='-Xmx500M'"
ExecStart=/home/bisq/bisq/bisq-monitor /home/bisq/monitor.properties
ExecReload=/bin/kill -USR1 $MAINPID
Restart=on-failure

User=bisq
Group=bisq

[Install]
WantedBy=multi-user.target
@@ -17,6 +17,7 @@

package bisq.monitor;

import bisq.monitor.metric.MarketStats;
import bisq.monitor.metric.P2PMarketStats;
import bisq.monitor.metric.P2PNetworkLoad;
import bisq.monitor.metric.P2PSeedNodeSnapshot;
@@ -95,6 +96,7 @@ private void start() throws Throwable {
metrics.add(new P2PSeedNodeSnapshot(graphiteReporter));
metrics.add(new P2PMarketStats(graphiteReporter));
metrics.add(new PriceNodeStats(graphiteReporter));
metrics.add(new MarketStats(graphiteReporter));

// prepare configuration reload
// Note that this is most likely only work on Linux
@@ -57,8 +57,18 @@ protected Reporter() {
* Report our findings.
*
* @param values Map<metric name, metric value>
* @param prefix for example "bisq.torStartupTime"
* @param prefix for example "torStartupTime"
*/
public abstract void report(Map<String, String> values, String prefix);

/**
* Report our findings one by one.
*
* @param key the metric name
* @param value the value to report
* @param timestamp a unix timestamp in milliseconds
* @param prefix for example "torStartupTime"
*/
public abstract void report(String key, String value, String timestamp, String prefix);

}
@@ -0,0 +1,116 @@
/*
* This file is part of Bisq.
*
* bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.monitor.metric;

import bisq.monitor.Metric;
import bisq.monitor.Reporter;

import bisq.asset.Asset;
import bisq.asset.AssetRegistry;

import bisq.network.p2p.storage.payload.ProtectedStoragePayload;

import org.berndpruenster.netlayer.tor.TorCtlException;

import com.runjva.sourceforge.jsocks.protocol.SocksSocket;

import java.net.HttpURLConnection;
import java.net.Socket;
import java.net.URL;
import java.net.URLConnection;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import lombok.extern.slf4j.Slf4j;

/**
* Uses the markets API to retrieve market volume data.
*
* @author Florian Reimair
*
*/
@Slf4j
public class MarketStats extends Metric {

// poor mans JSON parser
private final Pattern marketPattern = Pattern.compile("\"market\" ?: ?\"([a-z_]+)\"");
private final Pattern amountPattern = Pattern.compile("\"amount\" ?: ?\"([\\d\\.]+)\"");
private final Pattern volumePattern = Pattern.compile("\"volume\" ?: ?\"([\\d\\.]+)\"");
private final Pattern timestampPattern = Pattern.compile("\"trade_date\" ?: ?([\\d]+)");

private final String marketApi = "https://markets.bisq.network";
private Long lastRun = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(15));

public MarketStats(Reporter reporter) {
super(reporter);
}

@Override
protected void execute() {
try {
// for each configured host
Map<String, String> result = new HashMap<>();

// assemble query
Long now = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
String query = "/api/trades?format=json&market=all&timestamp_from=" + lastRun + "&timestamp_to=" + now;
lastRun = now; // thought about adding 1 second but what if a trade is done exactly in this one second?

// connect
URLConnection connection = new URL(marketApi + query).openConnection();

// prepare to receive data
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));

String all = in.readLine();
in.close();

Arrays.stream(all.substring(0, all.length() - 2).split("}")).forEach(trade -> {
Matcher market = marketPattern.matcher(trade);
Matcher amount = amountPattern.matcher(trade);
Matcher timestamp = timestampPattern.matcher(trade);
market.find();
if (market.group(1).endsWith("btc")) {
amount = volumePattern.matcher(trade);
}
amount.find();
timestamp.find();
System.err.println(getName() + ".volume." + market.group(1) + " " + amount.group(1) + " " + timestamp.group(1).substring(0, timestamp.group(1).length() - 3));
reporter.report("volume." + market.group(1), amount.group(1), timestamp.group(1), getName());
});
} catch (IllegalStateException ignore) {
// no match found
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
@@ -76,7 +76,7 @@
private static final String MAX_CONNECTIONS = "run.maxConnections";
private static final String HISTORY_SIZE = "run.historySize";
private NetworkNode networkNode;
private final File torHiddenServiceDir = new File("monitor/work/metric_p2pNetworkLoad");
private final File torHiddenServiceDir = new File("metric_" + getName());
private final ThreadGate hsReady = new ThreadGate();
private final Map<String, Counter> buckets = new ConcurrentHashMap<>();

@@ -61,7 +61,7 @@
private static final String HOSTS = "run.hosts";
private static final String TOR_PROXY_PORT = "run.torProxyPort";
private NetworkNode networkNode;
private final File torHiddenServiceDir = new File("monitor/work/metric_p2pRoundTripTime");
private final File torHiddenServiceDir = new File("metric_" + getName());
private int nonce;
private long start;
private List<Long> samples;
@@ -32,7 +32,6 @@
import bisq.network.p2p.network.Connection;
import bisq.network.p2p.network.MessageListener;
import bisq.network.p2p.network.NetworkNode;
import bisq.network.p2p.network.SetupListener;
import bisq.network.p2p.network.TorNetworkNode;
import bisq.network.p2p.peers.getdata.messages.GetDataResponse;
import bisq.network.p2p.peers.getdata.messages.PreliminaryGetDataRequest;
@@ -48,8 +47,6 @@

import java.net.MalformedURLException;

import java.io.File;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -76,17 +73,13 @@
*
*/
@Slf4j
public class P2PSeedNodeSnapshot extends Metric implements MessageListener, SetupListener {
public class P2PSeedNodeSnapshot extends Metric implements MessageListener {

private static final String HOSTS = "run.hosts";
private static final String TOR_PROXY_PORT = "run.torProxyPort";
Statistics statistics;
private NetworkNode networkNode;
private final File torHiddenServiceDir = new File("monitor/work/metric_" + this.getClass().getSimpleName());
private int nonce;
final Map<NodeAddress, Statistics> bucketsPerHost = new ConcurrentHashMap<>();
private final Set<byte[]> hashes = new TreeSet<>(Arrays::compare);
private final ThreadGate hsReady = new ThreadGate();
private final ThreadGate gate = new ThreadGate();

/**
@@ -161,20 +154,12 @@ public P2PSeedNodeSnapshot(Reporter reporter) {

@Override
protected void execute() {
// in case we do not have a NetworkNode up and running, we create one
if (null == networkNode) {
// prepare the gate
hsReady.engage();

// start the network node
networkNode = new TorNetworkNode(Integer.parseInt(configuration.getProperty(TOR_PROXY_PORT, "9054")),
new CoreNetworkProtoResolver(), false,
new AvailableTor(Monitor.TOR_WORKING_DIR, torHiddenServiceDir.getName()));
networkNode.start(this);

// wait for the HS to be published
hsReady.await();
}
// start the network node
final NetworkNode networkNode = new TorNetworkNode(Integer.parseInt(configuration.getProperty(TOR_PROXY_PORT, "9054")),
new CoreNetworkProtoResolver(), false,
new AvailableTor(Monitor.TOR_WORKING_DIR, "unused"));
// we do not need to start the networkNode, as we do not need the HS
//networkNode.start(this);

// clear our buckets
bucketsPerHost.clear();
@@ -189,9 +174,8 @@ protected void execute() {
NodeAddress target = OnionParser.getNodeAddress(current);

// do the data request
nonce = new Random().nextInt();
SettableFuture<Connection> future = networkNode.sendMessage(target,
new PreliminaryGetDataRequest(nonce, hashes));
new PreliminaryGetDataRequest(new Random().nextInt(), hashes));

Futures.addCallback(future, new FutureCallback<>() {
@Override
@@ -253,7 +237,7 @@ void report() {
statistics.values().forEach((messageType, count) -> {
try {
report.put(OnionParser.prettyPrint(host) + ".relativeNumberOfMessages." + messageType,
String.valueOf(referenceValues.get(messageType).value() - ((Counter) count).value()));
String.valueOf(((Counter) count).value() - referenceValues.get(messageType).value()));
} catch (MalformedURLException ignore) {
log.error("we should never got here");
}
@@ -321,22 +305,4 @@ public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
networkEnvelope.getClass().getSimpleName());
}
}

@Override
public void onTorNodeReady() {
}

@Override
public void onHiddenServicePublished() {
// open the gate
hsReady.proceed();
}

@Override
public void onSetupFailed(Throwable throwable) {
}

@Override
public void onRequestCustomBridges() {
}
}
@@ -18,6 +18,7 @@
package bisq.monitor.metric;

import bisq.monitor.Metric;
import bisq.monitor.Monitor;
import bisq.monitor.Reporter;
import bisq.monitor.ThreadGate;

@@ -38,7 +39,7 @@

private static final String SERVICE_PORT = "run.servicePort";
private static final String LOCAL_PORT = "run.localPort";
private final String hiddenServiceDirectory = "monitor/work/metric_" + getName();
private final String hiddenServiceDirectory = "metric_" + getName();
private final ThreadGate gate = new ThreadGate();

public TorHiddenServiceStartupTime(Reporter reporter) {
@@ -53,7 +54,7 @@ protected void execute() {
int servicePort = Integer.parseInt(configuration.getProperty(SERVICE_PORT, "9999"));

// clear directory so we get a new onion address every time
new File(hiddenServiceDirectory).delete();
new File(Monitor.TOR_WORKING_DIR + "/" + hiddenServiceDirectory).delete();

log.debug("creating the hidden service");

@@ -50,17 +50,20 @@ public void report(long value) {

@Override
public void report(Map<String, String> values, String prefix) {
long timestamp = System.currentTimeMillis();
String timestamp = String.valueOf(System.currentTimeMillis());
values.forEach((key, value) -> {
// https://graphite.readthedocs.io/en/latest/feeding-carbon.html
String report = "bisq" + (Version.getBaseCurrencyNetwork() != 0 ? "-" + BaseCurrencyNetwork.values()[Version.getBaseCurrencyNetwork()].getNetwork() : "")
+ (prefix.isEmpty() ? "" : "." + prefix)
+ (key.isEmpty() ? "" : "." + key)
+ " " + value + " " + timestamp + "\n";
System.err.println("Report: " + report);
report(key, value, timestamp, prefix);
});
}

@Override
public void report(String key, String value, String timestamp, String prefix) {
System.err.println("Report: bisq" + (Version.getBaseCurrencyNetwork() != 0 ? "-" + BaseCurrencyNetwork.values()[Version.getBaseCurrencyNetwork()].getNetwork() : "")
+ (prefix.isEmpty() ? "" : "." + prefix)
+ (key.isEmpty() ? "" : "." + key)
+ " " + value + " " + timestamp);
}

@Override
public void report(Map<String, String> values) {
report(values, "");

0 comments on commit 80fa5c0

Please sign in to comment.
You can’t perform that action at this time.