From 05fcffd0d6d2a7641cf0ac13838e6561a819a1c2 Mon Sep 17 00:00:00 2001 From: dnck Date: Wed, 27 Mar 2019 13:41:13 +0100 Subject: [PATCH 01/15] fix error trace on node init from stat pub --- .../net/helix/sbx/service/stats/TransactionStatsPublisher.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/net/helix/sbx/service/stats/TransactionStatsPublisher.java b/src/main/java/net/helix/sbx/service/stats/TransactionStatsPublisher.java index 8a826e82..a95ea826 100644 --- a/src/main/java/net/helix/sbx/service/stats/TransactionStatsPublisher.java +++ b/src/main/java/net/helix/sbx/service/stats/TransactionStatsPublisher.java @@ -77,6 +77,8 @@ private Runnable getRunnable() { messageQ.publish(CONFIRMED_TRANSACTIONS_TOPIC + " %d", numConfirmed); messageQ.publish(TOTAL_TRANSACTIONS_TOPIC + " %d", numTransactions); + + } catch (Exception e) { log.error("Error while getting transaction counts : {}", e); } From 466d832b835d5d3e77e1d5b77bd9918ba2dbe0e1 Mon Sep 17 00:00:00 2001 From: dnck Date: Fri, 29 Mar 2019 15:16:22 +0100 Subject: [PATCH 02/15] of changes to init node --- src/main/java/net/helix/sbx/network/Node.java | 123 +----------------- .../stats/TransactionStatsPublisher.java | 9 +- 2 files changed, 6 insertions(+), 126 deletions(-) diff --git a/src/main/java/net/helix/sbx/network/Node.java b/src/main/java/net/helix/sbx/network/Node.java index e557d1cc..a427a6c4 100644 --- a/src/main/java/net/helix/sbx/network/Node.java +++ b/src/main/java/net/helix/sbx/network/Node.java @@ -1,5 +1,4 @@ package net.helix.sbx.network; - import net.helix.sbx.TransactionValidator; import net.helix.sbx.conf.NodeConfig; import net.helix.sbx.controllers.TipsViewModel; @@ -13,13 +12,11 @@ import net.helix.sbx.storage.Tangle; import net.helix.sbx.zmq.MessageQ; import net.helix.sbx.service.Graphstream; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.net.*; import java.nio.ByteBuffer; import java.security.MessageDigest; @@ -29,43 +26,32 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - /** * Class Node is the core class for handling gossip protocol packets. * Both TCP and UDP receivers will pass incoming packets to this class's object. * It is also responsible for validating and storing the received transactions * into the Tangle Database.
- * * The Gossip protocol is specific to nodes and is used for spamming and requesting * new transactions between peers. Every message sent on Gossip protocol consists of two * parts - the transaction in binary encoded format followed by a hash of another transaction to * be requested. The receiving entity will save the newly received transaction into * its own database and will respond with the received requested transaction - if * available in its own storgage. - * */ public class Node { - private static final Logger log = LoggerFactory.getLogger(Node.class); private final int reqHashSize; - - private int BROADCAST_QUEUE_SIZE; private int RECV_QUEUE_SIZE; private int REPLY_QUEUE_SIZE; private static final int PAUSE_BETWEEN_TRANSACTIONS = 1; - private final AtomicBoolean shuttingDown = new AtomicBoolean(false); - private final List neighbors = new CopyOnWriteArrayList<>(); private final ConcurrentSkipListSet broadcastQueue = weightQueue(); private final ConcurrentSkipListSet> receiveQueue = weightQueueTxPair(); private final ConcurrentSkipListSet> replyQueue = weightQueueHashPair(); - - private final DatagramPacket sendingPacket; private final DatagramPacket tipRequestingPacket; - private final ExecutorService executor = Executors.newFixedThreadPool(5); private final NodeConfig configuration; private final Tangle tangle; @@ -76,26 +62,18 @@ public class Node { private final TransactionRequester transactionRequester; private final MessageQ messageQ; private Graphstream graph; - private static final SecureRandom rnd = new SecureRandom(); - - private FIFOCache recentSeenBytes; - private static AtomicLong recentSeenBytesMissCount = new AtomicLong(0L); private static AtomicLong recentSeenBytesHitCount = new AtomicLong(0L); - private static long sendLimit = -1; private static AtomicLong sendPacketsCounter = new AtomicLong(0L); private static AtomicLong sendPacketsTimer = new AtomicLong(0L); - public static final ConcurrentSkipListSet rejectedAddresses = new ConcurrentSkipListSet(); private DatagramSocket udpSocket; - /** * Constructs a Node class instance. The constructor is passed reference * of several other instances. - * * @param tangle An instance of the Tangle storage interface * @param snapshotProvider data provider for the snapshots that are relevant for the node * @param transactionValidator makes sure transaction is not malformed. @@ -104,7 +82,6 @@ public class Node { * @param latestMilestoneTracker Tracks milestones issued from the coordinator * @param messageQ Responsible for publishing events on zeroMQ * @param configuration Contains all the config. - * */ public Node(final Tangle tangle, SnapshotProvider snapshotProvider, final TransactionValidator transactionValidator, final TransactionRequester transactionRequester, final TipsViewModel tipsViewModel, final LatestMilestoneTracker latestMilestoneTracker, final MessageQ messageQ, final NodeConfig configuration, Graphstream graph ) { @@ -121,73 +98,55 @@ public Node(final Tangle tangle, SnapshotProvider snapshotProvider, final Transa this.sendingPacket = new DatagramPacket(new byte[packetSize], packetSize); this.tipRequestingPacket = new DatagramPacket(new byte[packetSize], packetSize); this.graph = graph; - } - /** * Intialize the operations by spawning all the worker threads. - * */ public void init() throws Exception { - - //TODO ask Alon sendLimit = (long) ((configuration.getSendLimit() * 1000000) / (configuration.getTransactionPacketSize() * 8)); - BROADCAST_QUEUE_SIZE = RECV_QUEUE_SIZE = REPLY_QUEUE_SIZE = configuration.getqSizeNode(); recentSeenBytes = new FIFOCache<>(configuration.getCacheSizeBytes(), configuration.getpDropCacheEntry()); - parseNeighborsConfig(); - executor.submit(spawnBroadcasterThread()); executor.submit(spawnTipRequesterThread()); executor.submit(spawnNeighborDNSRefresherThread()); executor.submit(spawnProcessReceivedThread()); executor.submit(spawnReplyToRequestThread()); - executor.shutdown(); } - /** * Keeps the passed UDP DatagramSocket reference from {@link UDPReceiver}. * This is currently only used in creating a new {@link UDPNeighbor}. - * * @param {@link DatagramSocket} socket created by UDPReceiver */ public void setUDPSocket(final DatagramSocket socket) { this.udpSocket = socket; } - /** * Returns the stored UDP DatagramSocket reference from {@link UDPReceiver}. - * * @return {@link DatagramSocket} socket created by UDPReceiver */ public DatagramSocket getUdpSocket() { return udpSocket; } - /** * Internal map used to keep track of neighbor's IP vs DNS name */ private final Map neighborIpCache = new HashMap<>(); - /** * One of the problem of dynamic DNS is neighbor could reconnect and get assigned * a new IP address. This thread periodically resovles the DNS to make sure * the IP is updated in the quickest possible manner. Doing it fast will increase * the detection of change - however will generate lot of unnecessary DNS outbound * traffic - so a balance is sought between speed and resource utilization. - * */ private Runnable spawnNeighborDNSRefresherThread() { return () -> { if (configuration.isDnsResolutionEnabled()) { log.info("Spawning Neighbor DNS Refresher Thread"); - while (!shuttingDown.get()) { int dnsCounter = 0; log.info("Checking Neighbors' Ip..."); - try { neighbors.forEach(n -> { final String hostname = n.getAddress().getHostName(); @@ -195,7 +154,6 @@ private Runnable spawnNeighborDNSRefresherThread() { log.info("DNS Checker: Validating DNS Address '{}' with '{}'", hostname, ip); messageQ.publish("dnscv %s %s", hostname, ip); final String neighborAddress = neighborIpCache.get(hostname); - if (neighborAddress == null) { neighborIpCache.put(hostname, ip); } else { @@ -208,10 +166,8 @@ private Runnable spawnNeighborDNSRefresherThread() { messageQ.publish("dnscu %s", hostname); String protocol = (n instanceof TCPNeighbor) ? "tcp://" : "udp://"; String port = ":" + n.getAddress().getPort(); - uri(protocol + hostname + port).ifPresent(uri -> { removeNeighbor(uri, n.isFlagged()); - uri(protocol + ip + port).ifPresent(nuri -> { Neighbor neighbor = newNeighbor(nuri, n.isFlagged()); addNeighbor(neighbor); @@ -225,7 +181,6 @@ private Runnable spawnNeighborDNSRefresherThread() { } }); }); - while (dnsCounter++ < 60 * 30 && !shuttingDown.get()) { Thread.sleep(1000); } @@ -239,37 +194,27 @@ private Runnable spawnNeighborDNSRefresherThread() { } }; } - /** * Checks whether the passed DNS is an IP address in string form or a DNS * hostname. - * * @return An IP address (decimal form) in string resolved from the given DNS - * */ private Optional checkIp(final String dnsName) { - if (StringUtils.isEmpty(dnsName)) { return Optional.empty(); } - InetAddress inetAddress; try { inetAddress = java.net.InetAddress.getByName(dnsName); } catch (UnknownHostException e) { return Optional.empty(); } - final String hostAddress = inetAddress.getHostAddress(); - if (StringUtils.equals(dnsName, hostAddress)) { // not a DNS... return Optional.empty(); } - return Optional.of(hostAddress); } - - /** * First Entry point for receiving any incoming transactions from TCP/UDP Receivers. * At this point, the transport protocol (UDP/TCP) is irrelevant. We check if we have @@ -277,18 +222,14 @@ private Optional checkIp(final String dnsName) { * comparing it against a saved hash set. If the packet is new, we construct * a {@link TransactionViewModel} object from it and perform some basic validation * on the received transaction via {@link TransactionValidator#runValidation} - * * The packet is then added to receiveQueue for further processing. */ - public void preProcessReceivedData(byte[] receivedData, SocketAddress senderAddress, String uriScheme) { TransactionViewModel receivedTransactionViewModel = null; Hash receivedTransactionHash = null; - boolean addressMatch = false; boolean cached = false; double pDropTransaction = configuration.getpDropTransaction(); - for (final Neighbor neighbor : getNeighbors()) { addressMatch = neighbor.matches(senderAddress); if (addressMatch) { @@ -299,30 +240,23 @@ public void preProcessReceivedData(byte[] receivedData, SocketAddress senderAddr break; } try { - //Transaction bytes ByteBuffer digest = getBytesDigest(receivedData); - //check if cached synchronized (recentSeenBytes) { cached = (receivedTransactionHash = recentSeenBytes.get(digest)) != null; } - if (!cached) { //if not, then validate receivedTransactionViewModel = new TransactionViewModel(receivedData, TransactionHash.calculate(receivedData, TransactionViewModel.SIZE, SpongeFactory.create(SpongeFactory.Mode.S256))); receivedTransactionHash = receivedTransactionViewModel.getHash(); transactionValidator.runValidation(receivedTransactionViewModel, transactionValidator.getMinWeightMagnitude()); - synchronized (recentSeenBytes) { recentSeenBytes.put(digest, receivedTransactionHash); } - //if valid - add to receive queue (receivedTransactionViewModel, neighbor) addReceivedDataToReceiveQueue(receivedTransactionViewModel, neighbor); - } - } catch (NoSuchAlgorithmException e) { log.error("MessageDigest: " + e); } catch (final TransactionValidator.StaleTimestampException e) { @@ -333,27 +267,21 @@ public void preProcessReceivedData(byte[] receivedData, SocketAddress senderAddr log.error(e1.getMessage()); } neighbor.incStaleTransactions(); - } catch (final RuntimeException e) { log.error(e.getMessage()); log.error("Received an Invalid TransactionViewModel. Dropping it..."); neighbor.incInvalidTransactions(); break; } - //Request bytes - //add request to reply queue (requestedHash, neighbor) Hash requestedHash = HashFactory.TRANSACTION.create(receivedData, TransactionViewModel.SIZE, reqHashSize); if (requestedHash.equals(receivedTransactionHash)) { //requesting a random tip requestedHash = Hash.NULL_HASH; } - addReceivedDataToReplyQueue(requestedHash, neighbor); - //recentSeenBytes statistics - if (log.isDebugEnabled()) { long hitCount, missCount; if (cached) { @@ -370,11 +298,9 @@ public void preProcessReceivedData(byte[] receivedData, SocketAddress senderAddr recentSeenBytesHitCount.set(0L); } } - break; } } - if (!addressMatch && configuration.isTestnet()) { int maxPeersAllowed = configuration.getMaxPeers(); String uriString = uriScheme + ":/" + senderAddress.toString(); @@ -404,7 +330,6 @@ public void preProcessReceivedData(byte[] receivedData, SocketAddress senderAddr } } } - /** * Adds incoming transactions to the {@link Node#receiveQueue} to be processed later. */ @@ -413,9 +338,7 @@ public void addReceivedDataToReceiveQueue(TransactionViewModel receivedTransacti if (receiveQueue.size() > RECV_QUEUE_SIZE) { receiveQueue.pollLast(); } - } - /** * Adds incoming transactions to the {@link Node#replyQueue} to be processed later */ @@ -425,7 +348,6 @@ public void addReceivedDataToReplyQueue(Hash requestedHash, Neighbor neighbor) { replyQueue.pollLast(); } } - /** * Picks up a transaction and neighbor pair from receive queue. Calls * {@link Node#processReceivedData} on the pair. @@ -436,7 +358,6 @@ public void processReceivedDataFromQueue() { processReceivedData(receivedData.getLeft(), receivedData.getRight()); } } - /** * Picks up a transaction hash and neighbor pair from reply queue. Calls * {@link Node#replyToRequest} on the pair. @@ -447,7 +368,6 @@ public void replyToRequestFromQueue() { replyToRequest(receivedData.getLeft(), receivedData.getRight()); } } - /** * This is second step of incoming transaction processing. The newly received * and validated transactions are stored in {@link Node#receiveQueue}. This function @@ -455,9 +375,7 @@ public void replyToRequestFromQueue() { * transaction is then added to the broadcast queue, to be fruther spammed to the neighbors. */ public void processReceivedData(TransactionViewModel receivedTransactionViewModel, Neighbor neighbor) { - boolean stored = false; - //store new transaction try { stored = receivedTransactionViewModel.store(tangle, snapshotProvider.getInitialSnapshot()); @@ -468,7 +386,6 @@ public void processReceivedData(TransactionViewModel receivedTransactionViewMode log.error("Error accessing persistence store.", e); neighbor.incInvalidTransactions(); } - //if new, then broadcast to all neighbors if (stored) { receivedTransactionViewModel.setArrivalTime(System.currentTimeMillis()); @@ -482,9 +399,7 @@ public void processReceivedData(TransactionViewModel receivedTransactionViewMode neighbor.incNewTransactions(); broadcast(receivedTransactionViewModel); } - } - /** * This is second step of incoming transaction processing. The newly received * and validated transactions are stored in {@link Node#receiveQueue}. This function @@ -492,10 +407,8 @@ public void processReceivedData(TransactionViewModel receivedTransactionViewMode * transaction is then added to the broadcast queue, to be fruther spammed to the neighbors. */ public void replyToRequest(Hash requestedHash, Neighbor neighbor) { - TransactionViewModel transactionViewModel = null; Hash transactionPointer; - //retrieve requested transaction if (requestedHash.equals(Hash.NULL_HASH)) { //Random Tip Request @@ -522,7 +435,6 @@ public void replyToRequest(Hash requestedHash, Neighbor neighbor) { log.error("Error while searching for transaction.", e); } } - if (transactionViewModel != null && transactionViewModel.getType() == TransactionViewModel.FILLED_SLOT) { //send hbytes back to neighbor try { @@ -548,14 +460,15 @@ public void replyToRequest(Hash requestedHash, Neighbor neighbor) { } } - } - + /** + * This function is used once by th replyToRequest method above. + * It returns a random, solid tip to send to the pulling node. + */ private Hash getRandomTipPointer() throws Exception { Hash tip = rnd.nextDouble() < configuration.getpSendMilestone() ? latestMilestoneTracker.getLatestMilestoneHash() : tipsViewModel.getRandomSolidTipHash(); return tip == null ? Hash.NULL_HASH : tip; } - /** * Sends a Datagram to the neighbour. Also appends a random hash request * to the outgoing packet. Note that this is only used for UDP handling. For TCP @@ -567,7 +480,6 @@ private Hash getRandomTipPointer() throws Exception { * */ public void sendPacket(DatagramPacket sendingPacket, TransactionViewModel transactionViewModel, Neighbor neighbor) throws Exception { - //limit amount of sends per second long now = System.currentTimeMillis(); if ((now - sendPacketsTimer.get()) > 1000L) { @@ -580,7 +492,6 @@ public void sendPacket(DatagramPacket sendingPacket, TransactionViewModel transa //log.info("exceeded limit - don't send - {}",sendPacketsCounter.get()); return; } - synchronized (sendingPacket) { System.arraycopy(transactionViewModel.getBytes(), 0, sendingPacket.getData(), 0, TransactionViewModel.SIZE); Hash hash = transactionRequester.transactionToRequest(rnd.nextDouble() < configuration.getpSelectMilestoneChild()); @@ -588,10 +499,8 @@ public void sendPacket(DatagramPacket sendingPacket, TransactionViewModel transa sendingPacket.getData(), TransactionViewModel.SIZE, reqHashSize); neighbor.send(sendingPacket); } - sendPacketsCounter.getAndIncrement(); } - /** * Does the same as {@link #sendPacket(DatagramPacket, TransactionViewModel, Neighbor)} but defaults to using the * same internal {@link #sendingPacket} as all the other methods in this class, which allows external callers to @@ -605,7 +514,6 @@ public void sendPacket(DatagramPacket sendingPacket, TransactionViewModel transa public void sendPacket(TransactionViewModel transactionViewModel, Neighbor neighbor) throws Exception { sendPacket(sendingPacket, transactionViewModel, neighbor); } - /** * This thread picks up a new transaction from the broadcast queue and * spams it to all of the neigbors. Sadly, this also includes the neigbor who @@ -614,11 +522,8 @@ public void sendPacket(TransactionViewModel transactionViewModel, Neighbor neigh */ private Runnable spawnBroadcasterThread() { return () -> { - log.info("Spawning Broadcaster Thread"); - while (!shuttingDown.get()) { - try { final TransactionViewModel transactionViewModel = broadcastQueue.pollFirst(); if (transactionViewModel != null) { @@ -639,27 +544,22 @@ private Runnable spawnBroadcasterThread() { log.info("Shutting down Broadcaster Thread"); }; } - /** * We send a tip request packet (transaction corresponding to the latest milestone) * to all of our neighbors periodically. */ private Runnable spawnTipRequesterThread() { return () -> { - log.info("Spawning Tips Requester Thread"); long lastTime = 0; while (!shuttingDown.get()) { - try { final TransactionViewModel transactionViewModel = TransactionViewModel.fromHash(tangle, latestMilestoneTracker.getLatestMilestoneHash()); System.arraycopy(transactionViewModel.getBytes(), 0, tipRequestingPacket.getData(), 0, TransactionViewModel.SIZE); System.arraycopy(transactionViewModel.getHash().bytes(), 0, tipRequestingPacket.getData(), TransactionViewModel.SIZE, reqHashSize); //Hash.SIZE_IN_BYTES); - neighbors.forEach(n -> n.send(tipRequestingPacket)); - long now = System.currentTimeMillis(); if ((now - lastTime) > 10000L) { lastTime = now; @@ -672,7 +572,6 @@ private Runnable spawnTipRequesterThread() { transactionRequester.numberOfTransactionsToRequest(), getReplyQueueSize(), TransactionViewModel.getNumberOfStoredTransactions(tangle)); } - Thread.sleep(5000); } catch (final Exception e) { log.error("Tips Requester Thread Exception:", e); @@ -684,11 +583,8 @@ private Runnable spawnTipRequesterThread() { private Runnable spawnProcessReceivedThread() { return () -> { - log.info("Spawning Process Received Data Thread"); - while (!shuttingDown.get()) { - try { processReceivedDataFromQueue(); Thread.sleep(1); @@ -702,11 +598,8 @@ private Runnable spawnProcessReceivedThread() { private Runnable spawnReplyToRequestThread() { return () -> { - log.info("Spawning Reply To Request Thread"); - while (!shuttingDown.get()) { - try { replyToRequestFromQueue(); Thread.sleep(1); @@ -718,7 +611,6 @@ private Runnable spawnReplyToRequestThread() { }; } - private static ConcurrentSkipListSet weightQueue() { return new ConcurrentSkipListSet<>((transaction1, transaction2) -> { if (transaction1.weightMagnitude == transaction2.weightMagnitude) { @@ -738,14 +630,12 @@ private static ConcurrentSkipListSet> weightQueueHashPair() return new ConcurrentSkipListSet>((transaction1, transaction2) -> { Hash tx1 = transaction1.getLeft(); Hash tx2 = transaction2.getLeft(); - for (int i = Hash.SIZE_IN_BYTES; i-- > 0; ) { if (tx1.bytes()[i] != tx2.bytes()[i]) { return tx2.bytes()[i] - tx1.bytes()[i]; } } return 0; - }); } @@ -753,7 +643,6 @@ private static ConcurrentSkipListSet> weigh return new ConcurrentSkipListSet>((transaction1, transaction2) -> { TransactionViewModel tx1 = transaction1.getLeft(); TransactionViewModel tx2 = transaction2.getLeft(); - if (tx1.weightMagnitude == tx2.weightMagnitude) { for (int i = Hash.SIZE_IN_BYTES; i-- > 0; ) { if (tx1.getHash().bytes()[i] != tx2.getHash().bytes()[i]) { @@ -766,7 +655,6 @@ private static ConcurrentSkipListSet> weigh }); } - public void broadcast(final TransactionViewModel transactionViewModel) { broadcastQueue.add(transactionViewModel); if (broadcastQueue.size() > BROADCAST_QUEUE_SIZE) { @@ -786,7 +674,6 @@ private ByteBuffer getBytesDigest(byte[] receivedData) throws NoSuchAlgorithmExc } // helpers methods - public boolean removeNeighbor(final URI uri, boolean isConfigured) { final Neighbor neighbor = newNeighbor(uri, isConfigured); if (uri.getScheme().equals("tcp")) { @@ -874,7 +761,6 @@ public int getReplyQueueSize() { } public class FIFOCache { - private final int capacity; private final double dropRate; private LinkedHashMap map; @@ -907,5 +793,4 @@ public V put(K key, V value) { return this.map.put(key, value); } } - } diff --git a/src/main/java/net/helix/sbx/service/stats/TransactionStatsPublisher.java b/src/main/java/net/helix/sbx/service/stats/TransactionStatsPublisher.java index a95ea826..53e9a5b1 100644 --- a/src/main/java/net/helix/sbx/service/stats/TransactionStatsPublisher.java +++ b/src/main/java/net/helix/sbx/service/stats/TransactionStatsPublisher.java @@ -49,7 +49,6 @@ public class TransactionStatsPublisher { public TransactionStatsPublisher(Tangle tangle, TipsViewModel tipsViewModel, TipSelector tipsSelector, MessageQ messageQ) { - this.tangle = tangle; this.tipsViewModel = tipsViewModel; this.tipsSelector = tipsSelector; @@ -78,7 +77,6 @@ private Runnable getRunnable() { messageQ.publish(CONFIRMED_TRANSACTIONS_TOPIC + " %d", numConfirmed); messageQ.publish(TOTAL_TRANSACTIONS_TOPIC + " %d", numTransactions); - } catch (Exception e) { log.error("Error while getting transaction counts : {}", e); } @@ -92,7 +90,6 @@ private Runnable getRunnable() { } private Hash getSuperTip() throws Exception { - // call the usual tip selection and return the first tip List tips = tipsSelector.getTransactionsToApprove(3, Optional.empty()); @@ -100,13 +97,11 @@ private Hash getSuperTip() throws Exception { } private long getConfirmedTransactionsCount(Instant now) throws Exception { - return approveeCounter.getCount(now, getSuperTip(), new HashSet<>()); } private long getAllTransactionsCount(Instant now) throws Exception { - - // count all transactions in a scalable way, by counting the approvees of all the tips + // count all transactions in a scalable way, by counting the parents of all the tips HashSet processedTransactions = new HashSet<>(); long count = 0; for (Hash tip : tipsViewModel.getTips()) { @@ -114,7 +109,7 @@ private long getAllTransactionsCount(Instant now) throws Exception { if (approveeCounter.isInTimeWindow(now, TransactionViewModel.fromHash(tangle, tip))) { count += 1 + approveeCounter.getCount(now, tip, processedTransactions); } else { - // even if the tip is not in the time window, count approvees that might be older + // even if the tip is not in the time window, count parents that might be older count += approveeCounter.getCount(now, tip, processedTransactions); } } From 358a19f34730b4befb29c17d8466588b5610470f Mon Sep 17 00:00:00 2001 From: dnck Date: Fri, 29 Mar 2019 15:20:05 +0100 Subject: [PATCH 03/15] addd gitignore for local --- .gitignore | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index b6721d79..ee5836dd 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,8 @@ hs_err_pid* # targets and created dirs target target/* +out +out/* dependency-reduced-pom.xml effective.pom .DS_Store @@ -51,4 +53,4 @@ src/main/java/net/helix/sbx/localTest/* src/main/java/net/helix/sbx/localTest/ # private logs -*.log \ No newline at end of file +*.log From 8d96d421205745b0baf8e2a95509d203dd5e932c Mon Sep 17 00:00:00 2001 From: dnck Date: Fri, 10 May 2019 10:53:28 +0200 Subject: [PATCH 04/15] add filters to logback-save --- src/main/resources/logback-save.xml | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/main/resources/logback-save.xml b/src/main/resources/logback-save.xml index 5d04f8aa..c5425bf5 100644 --- a/src/main/resources/logback-save.xml +++ b/src/main/resources/logback-save.xml @@ -27,10 +27,31 @@ %d{MM/dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + ${log.name} System.out + + TRACE + ACCEPT + NEUTRAL + + + DEBUG + ACCEPT + NEUTRAL + + + INFO + ACCEPT + NEUTRAL + + + WARN + ACCEPT + DENY + %d{MM/dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n From 67f28f7a3f702e8955e2653d76cd71f39301a894 Mon Sep 17 00:00:00 2001 From: dnck Date: Thu, 16 May 2019 11:27:53 +0200 Subject: [PATCH 05/15] sync with upstream and add py scripts for debugging --- configure-many.py | 128 ++++++++++++++++++ removedb.py | 31 +++++ .../sbx/controllers/TransactionViewModel.java | 1 - 3 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 configure-many.py create mode 100644 removedb.py diff --git a/configure-many.py b/configure-many.py new file mode 100644 index 00000000..9e4e802d --- /dev/null +++ b/configure-many.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 *-* +# Module: Topology & Neighbors +# This module will handle the static topology of the network +# and generate configuration files for the jars. +# Note that the term, "static", is important, since the overall goal +# should be to acheive a simulation that allows for node joining and +# leaving the network. +import shutil +import os +from subprocess import call +import argparse +import networkx as nx + +class SmallWorldTopology(): + def __init__(self, numNodes, averageNeighborsPerNode, probabilityAddingAdditionalNeighbors): + self.nNeighbors = averageNeighborsPerNode + self.pNewEdge = probabilityAddingAdditionalNeighbors + self.overlay = nx.watts_strogatz_graph(n=numNodes, k=self.nNeighbors, p=self.pNewEdge) + + def get_neighbor(self, nodeID): + if list(self.overlay.neighbors(nodeID)): + return list(self.overlay.neighbors(nodeID)) + + def get_matrix(self): + self.overlay_matrix = nx.to_numpy_matrix(self.overlay) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='') + parser.add_argument('-apiPortStart', metavar='Port to start iterating at', type=int, default=14000, help='') + parser.add_argument('-udpReceiverPortStart', metavar='UDP Receiver port to start iterating at', type=int, default=24000, help='') + parser.add_argument('-tcpReceiverPortStart', metavar='TCP Receiver port to start iterating at', type=int, default=34000, help='') + parser.add_argument('-zmqIpcStart', metavar='ZMQ ipc port to start iterating at', type=int, default=1, help='') + parser.add_argument('-zmqPortStart', metavar='ZMQ port to start iterating at', type=int, default=5550, help='') + parser.add_argument('-numNodes', metavar='Number of nodes.', type=int, default=10, help='') + args = parser.parse_args() + apiPortStart = args.apiPortStart + udpReceiverPortStart = args.udpReceiverPortStart + tcpReceiverPortStart = args.tcpReceiverPortStart + inet = "127.0.0.1" + tcpInetPort = udpReceiverPortStart + udpInetPort = udpReceiverPortStart + zmqIpcStart = args.zmqIpcStart + zmqPortStart = args.zmqPortStart + numNodes = args.numNodes + topology = SmallWorldTopology(numNodes, averageNeighborsPerNode=8, probabilityAddingAdditionalNeighbors=0.1) + for i in range(numNodes): + config = """#NODE {} CONFIG +API_PORT = {} +API_HOST = localhost +REMOTE_LIMIT_API = addNeighbors, getNeighbors, removeNeighbors, attachToTangle, interruptAttachingToTangle +MAX_FIND_TRANSACTIONS = 100_000 +MAX_REQUESTS_LIST = 1_000 +MAX_GET_BYTES = 10_000 +MAX_BODY_LENGTH = 1_000_000 +#REMOTE_AUTH = +MS_DELAY = 0 +UDP_RECEIVER_PORT = {} +TCP_RECEIVER_PORT = {} +NEIGHBORS +P_REMOVE_REQUEST = 0.01 +SEND_LIMIT = -1 +MAX_PEERS = 0 +DNS_REFRESHER_ENABLED = true +DNS_RESOLUTION_ENABLED = true +HXI_DIR = hxi +DB_PATH = mainnetdb +DB_LOG_PATH = mainnet.log +DB_CACHE_SIZE = 100_000 +ROCKS_DB = rocksdb +REVALIDATE = false +RESCAN_DB = false +P_REPLY_RANDOM_TIP = 0.66 +P_DROP_TRANSACTION = 0.0 +P_SELECT_MILESTONE_CHILD = 0.7 +P_SEND_MILESTONE = 0.02 +P_PROPAGATE_REQUEST = 0.01 +MWM = 1 +PACKET_SIZE = 1200 +REQ_HASH_SIZE = 32 +QUEUE_SIZE = 1_000 +P_DROP_CACHE_ENTRY = 0.02 +CACHE_SIZE_BYTES = 150_000 +ZMQ_THREADS = 1 +ZMQ_IPC = ipc://sbx/feeds/{} +ZMQ_ENABLED = false +ZMQ_PORT = {} +GRAPH_ENABLED = false +MAX_DEPTH = 15 +ALPHA = 0.001 +TIP_SOLIDIFIER_ENABLED = true +POW_THREADS = 0 +COORDINATOR_ADDRESS = 6a8413edc634e948e3446806afde11b17e0e188faf80a59a8b1147a0600cc5db +LOCAL_SNAPSHOTS_ENABLED = true +LOCAL_SNAPSHOTS_PRUNING_ENABLED = true +LOCAL_SNAPSHOTS_PRUNING_DELAY = 50000 +LOCAL_SNAPSHOTS_INTERVAL_SYNCED = 10 +LOCAL_SNAPSHOTS_INTERVAL_UNSYNCED = 1000 +LOCAL_SNAPSHOTS_BASE_PATH = mainnet +LOCAL_SNAPSHOTS_DEPTH = 100 +SNAPSHOT_FILE = /snapshotMainnet.txt +SNAPSHOT_SIG_FILE = /snapshotMainnet.sig +PREVIOUS_EPOCHS_SPENT_ADDRESSES_TXT = /previousEpochsSpentAddresses.txt +PREVIOUS_EPOCHS_SPENT_ADDRESSES_SIG = /previousEpochsSpentAddresses.sig +GLOBAL_SNAPSHOT_TIME = 1522235533 +MILESTONE_START_INDEX = 0 +NUM_KEYS_IN_MILESTONE = 10 +MAX_ANALYZED_TXS = 20_000 +SAVELOG = true +SAVELOG_BASE_PATH = logs/ +SAVELOG_XML_FILE = /logback-save.xml +""".format(i, apiPortStart,udpReceiverPortStart,tcpReceiverPortStart,zmqIpcStart, zmqPortStart) + nodeNeighbors = "" + for n in topology.get_neighbor(i): + nodeNeighbors = nodeNeighbors + "tcp://{}:{},".format(inet, tcpInetPort+n) + nodeNeighbors = nodeNeighbors[:-1] + config = config.replace("NEIGHBORS", "NEIGHBORS = {}".format(nodeNeighbors)) + apiPortStart +=1 + udpReceiverPortStart +=1 + tcpReceiverPortStart +=1 + zmqIpcStart +=1 + zmqPortStart +=1 + numNodes +=1 + if os.path.isdir(os.path.normpath("./many_configs")): + pass + else: + os.mkdir(os.path.normpath("./many_configs")) + with open(os.path.normpath("./many_configs/config{}.ini".format(i)), "w") as f: + f.write(config) diff --git a/removedb.py b/removedb.py new file mode 100644 index 00000000..8d614efb --- /dev/null +++ b/removedb.py @@ -0,0 +1,31 @@ +import shutil +import os +from subprocess import call +import argparse + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Remove the databases from the node to start fresh') + parser.add_argument('-all', metavar='Remove everything', type=bool, default=True, help='Remove everything including the snapshot stuff') + parser.add_argument('-compile', metavar='run mvn clean compile package', type=bool, default=False, help='run compile project after removing stuff') + parser.add_argument('-cpr', metavar='copy and paste the src and target of a compiled version here', type=bool, default=False, help='cp -r dir1 -> pwd') + args = parser.parse_args() + dbs = ['db', 'hxi', 'spent-addresses-log', 'spent-addresses-db', 'mainnet.log', 'mainnetdb'] + fs = ['mainnet.snapshot.meta', 'mainnet.snapshot.state', 'mainnet.snapshot.meta.bkp', 'mainnet.snapshot.state.bkp'] + for dir in dbs: + try: + shutil.rmtree(dir) + except: + print("{} does not exist".format(dir)) + if args.all: + for f in fs: + try: + os.remove(f) + except: + print("{} does not exist".format(f)) + if args.compile: + call('mvn clean compile package', shell=True) + if args.cpr: + shutil.rmtree('src') + shutil.rmtree('target') + call('cp -r /home/hlx-dev/helix/testnet/fork0/testnet-1.0/src /home/hlx-dev/helix/testnet/fork1/testnet-1.0/', shell=True) + call('cp -r /home/hlx-dev/helix/testnet/fork0/testnet-1.0/target /home/hlx-dev/helix/testnet/fork1/testnet-1.0/', shell=True) diff --git a/src/main/java/net/helix/sbx/controllers/TransactionViewModel.java b/src/main/java/net/helix/sbx/controllers/TransactionViewModel.java index 1ce32966..46ffdaf4 100644 --- a/src/main/java/net/helix/sbx/controllers/TransactionViewModel.java +++ b/src/main/java/net/helix/sbx/controllers/TransactionViewModel.java @@ -119,7 +119,6 @@ public static boolean mightExist(Tangle tangle, Hash hash) throws Exception { public TransactionViewModel(final Transaction transaction, final Hash hash) { this.transaction = transaction == null || transaction.bytes == null ? new Transaction(): transaction; this.hash = hash == null ? Hash.NULL_HASH: hash; - // depends on trailing or leading // weightMagnitude = this.hash.trailingZeros(); weightMagnitude = this.hash.leadingZeros(); From 00a35e3473df2d07642ec0fac83ac1bb42221ef1 Mon Sep 17 00:00:00 2001 From: dnck Date: Fri, 28 Jun 2019 11:43:25 +0200 Subject: [PATCH 06/15] format and style to match upstream --- .gitignore | 6 +- .../hlx/controllers/TransactionViewModel.java | 1 + src/main/java/net/helix/hlx/network/Node.java | 124 +++++++++++++++++- .../stats/TransactionStatsPublisher.java | 10 -- src/main/resources/logback-save.xml | 1 - 5 files changed, 119 insertions(+), 23 deletions(-) diff --git a/.gitignore b/.gitignore index 81328ad0..a25a3539 100644 --- a/.gitignore +++ b/.gitignore @@ -24,8 +24,6 @@ hs_err_pid* # targets and created dirs target target/* -out -out/* dependency-reduced-pom.xml effective.pom .classpath @@ -63,7 +61,5 @@ modules/* *.key *.pem - # os -.DS_Store - +.DS_Store \ No newline at end of file diff --git a/src/main/java/net/helix/hlx/controllers/TransactionViewModel.java b/src/main/java/net/helix/hlx/controllers/TransactionViewModel.java index cf4bc098..2f9fb8bf 100644 --- a/src/main/java/net/helix/hlx/controllers/TransactionViewModel.java +++ b/src/main/java/net/helix/hlx/controllers/TransactionViewModel.java @@ -117,6 +117,7 @@ public static boolean mightExist(Tangle tangle, Hash hash) throws Exception { public TransactionViewModel(final Transaction transaction, final Hash hash) { this.transaction = transaction == null || transaction.bytes == null ? new Transaction(): transaction; this.hash = hash == null ? Hash.NULL_HASH: hash; + // depends on trailing or leading // weightMagnitude = this.hash.trailingZeros(); weightMagnitude = this.hash.leadingZeros(); diff --git a/src/main/java/net/helix/hlx/network/Node.java b/src/main/java/net/helix/hlx/network/Node.java index f1f1e58e..f3b490dc 100644 --- a/src/main/java/net/helix/hlx/network/Node.java +++ b/src/main/java/net/helix/hlx/network/Node.java @@ -21,6 +21,7 @@ import org.bouncycastle.util.encoders.Hex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.net.*; import java.nio.ByteBuffer; import java.security.MessageDigest; @@ -30,32 +31,43 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; + /** * Class Node is the core class for handling gossip protocol packets. * Both TCP and UDP receivers will pass incoming packets to this class's object. * It is also responsible for validating and storing the received transactions * into the Tangle Database.
+ * * The Gossip protocol is specific to nodes and is used for spamming and requesting * new transactions between peers. Every message sent on Gossip protocol consists of two * parts - the transaction in binary encoded format followed by a hash of another transaction to * be requested. The receiving entity will save the newly received transaction into * its own database and will respond with the received requested transaction - if * available in its own storgage. + * */ public class Node { + private static final Logger log = LoggerFactory.getLogger(Node.class); private final int reqHashSize; + + private int BROADCAST_QUEUE_SIZE; private int RECV_QUEUE_SIZE; private int REPLY_QUEUE_SIZE; private static final int PAUSE_BETWEEN_TRANSACTIONS = 1; + private final AtomicBoolean shuttingDown = new AtomicBoolean(false); + private final List neighbors = new CopyOnWriteArrayList<>(); private final ConcurrentSkipListSet broadcastQueue = weightQueue(); private final ConcurrentSkipListSet> receiveQueue = weightQueueTxPair(); private final ConcurrentSkipListSet> replyQueue = weightQueueHashPair(); + + private final DatagramPacket sendingPacket; private final DatagramPacket tipRequestingPacket; + private final ExecutorService executor = Executors.newFixedThreadPool(5); private final NodeConfig configuration; private final Tangle tangle; @@ -65,18 +77,26 @@ public class Node { private final LatestMilestoneTracker latestMilestoneTracker; private final TransactionRequester transactionRequester; private Graphstream graph; + private static final SecureRandom rnd = new SecureRandom(); + + private FIFOCache recentSeenBytes; + private static AtomicLong recentSeenBytesMissCount = new AtomicLong(0L); private static AtomicLong recentSeenBytesHitCount = new AtomicLong(0L); + private static long sendLimit = -1; private static AtomicLong sendPacketsCounter = new AtomicLong(0L); private static AtomicLong sendPacketsTimer = new AtomicLong(0L); + public static final ConcurrentSkipListSet rejectedAddresses = new ConcurrentSkipListSet(); private DatagramSocket udpSocket; + /** * Constructs a Node class instance. The constructor is passed reference * of several other instances. + * * @param tangle An instance of the Tangle storage interface * @param snapshotProvider data provider for the snapshots that are relevant for the node * @param transactionValidator makes sure transaction is not malformed. @@ -84,6 +104,7 @@ public class Node { * @param tipsViewModel Contains a hash of solid and non solid tips * @param latestMilestoneTracker Tracks milestones issued from the coordinator * @param configuration Contains all the config. + * */ public Node(final Tangle tangle, SnapshotProvider snapshotProvider, final TransactionValidator transactionValidator, final TransactionRequester transactionRequester, final TipsViewModel tipsViewModel, final LatestMilestoneTracker latestMilestoneTracker, final NodeConfig configuration, Graphstream graph ) { @@ -99,56 +120,73 @@ public Node(final Tangle tangle, SnapshotProvider snapshotProvider, final Transa this.sendingPacket = new DatagramPacket(new byte[packetSize], packetSize); this.tipRequestingPacket = new DatagramPacket(new byte[packetSize], packetSize); this.graph = graph; + } + /** * Intialize the operations by spawning all the worker threads. + * */ public void init() throws Exception { + + //TODO ask Alon sendLimit = (long) ((configuration.getSendLimit() * 1000000) / (configuration.getTransactionPacketSize() * 8)); + BROADCAST_QUEUE_SIZE = RECV_QUEUE_SIZE = REPLY_QUEUE_SIZE = configuration.getqSizeNode(); recentSeenBytes = new FIFOCache<>(configuration.getCacheSizeBytes(), configuration.getpDropCacheEntry()); - recentSeenBytes = new FIFOCache<>(configuration.getCacheSizeBytes(), configuration.getpDropCacheEntry()); + parseNeighborsConfig(); + executor.submit(spawnBroadcasterThread()); executor.submit(spawnTipRequesterThread()); executor.submit(spawnNeighborDNSRefresherThread()); executor.submit(spawnProcessReceivedThread()); executor.submit(spawnReplyToRequestThread()); + executor.shutdown(); } + /** * Keeps the passed UDP DatagramSocket reference from {@link UDPReceiver}. * This is currently only used in creating a new {@link UDPNeighbor}. + * * @param {@link DatagramSocket} socket created by UDPReceiver */ public void setUDPSocket(final DatagramSocket socket) { this.udpSocket = socket; } + /** * Returns the stored UDP DatagramSocket reference from {@link UDPReceiver}. + * * @return {@link DatagramSocket} socket created by UDPReceiver */ public DatagramSocket getUdpSocket() { return udpSocket; } + /** * Internal map used to keep track of neighbor's IP vs DNS name */ private final Map neighborIpCache = new HashMap<>(); + /** * One of the problem of dynamic DNS is neighbor could reconnect and get assigned * a new IP address. This thread periodically resovles the DNS to make sure * the IP is updated in the quickest possible manner. Doing it fast will increase * the detection of change - however will generate lot of unnecessary DNS outbound * traffic - so a balance is sought between speed and resource utilization. + * */ Runnable spawnNeighborDNSRefresherThread() { return () -> { if (configuration.isDnsResolutionEnabled()) { log.info("Spawning Neighbor DNS Refresher Thread"); + while (!shuttingDown.get()) { int dnsCounter = 0; log.info("Checking Neighbors' Ip..."); + try { neighbors.forEach(n -> { final String hostname = n.getAddress().getHostName(); @@ -156,6 +194,7 @@ Runnable spawnNeighborDNSRefresherThread() { log.info("DNS Checker: Validating DNS Address '{}' with '{}'", hostname, ip); tangle.publish("dnscv %s %s", hostname, ip); final String neighborAddress = neighborIpCache.get(hostname); + if (neighborAddress == null) { neighborIpCache.put(hostname, ip); } else { @@ -168,8 +207,10 @@ Runnable spawnNeighborDNSRefresherThread() { tangle.publish("dnscu %s", hostname); String protocol = (n instanceof TCPNeighbor) ? "tcp://" : "udp://"; String port = ":" + n.getAddress().getPort(); + uri(protocol + hostname + port).ifPresent(uri -> { removeNeighbor(uri, n.isFlagged()); + uri(protocol + ip + port).ifPresent(nuri -> { Neighbor neighbor = newNeighbor(nuri, n.isFlagged()); addNeighbor(neighbor); @@ -183,6 +224,7 @@ Runnable spawnNeighborDNSRefresherThread() { } }); }); + while (dnsCounter++ < 60 * 30 && !shuttingDown.get()) { Thread.sleep(1000); } @@ -196,27 +238,37 @@ Runnable spawnNeighborDNSRefresherThread() { } }; } + /** * Checks whether the passed DNS is an IP address in string form or a DNS * hostname. + * * @return An IP address (decimal form) in string resolved from the given DNS + * */ private Optional checkIp(final String dnsName) { + if (StringUtils.isEmpty(dnsName)) { return Optional.empty(); } + InetAddress inetAddress; try { inetAddress = java.net.InetAddress.getByName(dnsName); } catch (UnknownHostException e) { return Optional.empty(); } + final String hostAddress = inetAddress.getHostAddress(); + if (StringUtils.equals(dnsName, hostAddress)) { // not a DNS... return Optional.empty(); } + return Optional.of(hostAddress); } + + /** * First Entry point for receiving any incoming transactions from TCP/UDP Receivers. * At this point, the transport protocol (UDP/TCP) is irrelevant. We check if we have @@ -224,14 +276,18 @@ private Optional checkIp(final String dnsName) { * comparing it against a saved hash set. If the packet is new, we construct * a {@link TransactionViewModel} object from it and perform some basic validation * on the received transaction via {@link TransactionValidator#runValidation} + * * The packet is then added to receiveQueue for further processing. */ + public void preProcessReceivedData(byte[] receivedData, SocketAddress senderAddress, String uriScheme) { TransactionViewModel receivedTransactionViewModel = null; Hash receivedTransactionHash = null; + boolean addressMatch = false; boolean cached = false; double pDropTransaction = configuration.getpDropTransaction(); + for (final Neighbor neighbor : getNeighbors()) { addressMatch = neighbor.matches(senderAddress); if (addressMatch) { @@ -242,25 +298,30 @@ public void preProcessReceivedData(byte[] receivedData, SocketAddress senderAddr break; } try { + //Transaction bytes ByteBuffer digest = getBytesDigest(receivedData); + //check if cached synchronized (recentSeenBytes) { cached = (receivedTransactionHash = recentSeenBytes.get(digest)) != null; } //if not cached, then validate - if (!cached) { receivedTransactionViewModel = new TransactionViewModel(receivedData, TransactionHash.calculate(receivedData, TransactionViewModel.SIZE, SpongeFactory.create(SpongeFactory.Mode.S256))); receivedTransactionHash = receivedTransactionViewModel.getHash(); transactionValidator.runValidation(receivedTransactionViewModel, transactionValidator.getMinWeightMagnitude()); + synchronized (recentSeenBytes) { recentSeenBytes.put(digest, receivedTransactionHash); } + //if valid - add to receive queue (receivedTransactionViewModel, neighbor) addReceivedDataToReceiveQueue(receivedTransactionViewModel, neighbor); + } + } catch (NoSuchAlgorithmException e) { log.error("MessageDigest: " + e); } catch (final TransactionValidator.StaleTimestampException e) { @@ -271,21 +332,27 @@ public void preProcessReceivedData(byte[] receivedData, SocketAddress senderAddr log.error(e1.getMessage()); } neighbor.incStaleTransactions(); + } catch (final RuntimeException e) { log.error(e.getMessage()); log.error("Received an Invalid TransactionViewModel. Dropping it..."); neighbor.incInvalidTransactions(); break; } + //Request bytes + //add request to reply queue (requestedHash, neighbor) Hash requestedHash = HashFactory.TRANSACTION.create(receivedData, TransactionViewModel.SIZE, reqHashSize); if (requestedHash.equals(receivedTransactionHash)) { //requesting a random tip requestedHash = Hash.NULL_HASH; } + addReceivedDataToReplyQueue(requestedHash, neighbor); + //recentSeenBytes statistics + if (log.isDebugEnabled()) { long hitCount, missCount; if (cached) { @@ -302,9 +369,11 @@ public void preProcessReceivedData(byte[] receivedData, SocketAddress senderAddr recentSeenBytesHitCount.set(0L); } } + break; } } + if (!addressMatch && configuration.isTestnet()) { int maxPeersAllowed = configuration.getMaxPeers(); String uriString = uriScheme + ":/" + senderAddress.toString(); @@ -334,6 +403,7 @@ public void preProcessReceivedData(byte[] receivedData, SocketAddress senderAddr } } } + /** * Adds incoming transactions to the {@link Node#receiveQueue} to be processed later. */ @@ -342,7 +412,9 @@ public void addReceivedDataToReceiveQueue(TransactionViewModel receivedTransacti if (receiveQueue.size() > RECV_QUEUE_SIZE) { receiveQueue.pollLast(); } + } + /** * Adds incoming transactions to the {@link Node#replyQueue} to be processed later */ @@ -352,6 +424,7 @@ public void addReceivedDataToReplyQueue(Hash requestedHash, Neighbor neighbor) { replyQueue.pollLast(); } } + /** * Picks up a transaction and neighbor pair from receive queue. Calls * {@link Node#processReceivedData} on the pair. @@ -362,6 +435,7 @@ public void processReceivedDataFromQueue() { processReceivedData(receivedData.getLeft(), receivedData.getRight()); } } + /** * Picks up a transaction hash and neighbor pair from reply queue. Calls * {@link Node#replyToRequest} on the pair. @@ -372,6 +446,7 @@ public void replyToRequestFromQueue() { replyToRequest(receivedData.getLeft(), receivedData.getRight()); } } + /** * This is second step of incoming transaction processing. The newly received * and validated transactions are stored in {@link Node#receiveQueue}. This function @@ -379,7 +454,9 @@ public void replyToRequestFromQueue() { * transaction is then added to the broadcast queue, to be fruther spammed to the neighbors. */ public void processReceivedData(TransactionViewModel receivedTransactionViewModel, Neighbor neighbor) { + boolean stored = false; + //store new transaction try { stored = receivedTransactionViewModel.store(tangle, snapshotProvider.getInitialSnapshot()); @@ -390,6 +467,7 @@ public void processReceivedData(TransactionViewModel receivedTransactionViewMode log.error("Error accessing persistence store.", e); neighbor.incInvalidTransactions(); } + //if new, then broadcast to all neighbors if (stored) { receivedTransactionViewModel.setArrivalTime(System.currentTimeMillis()/1000L); @@ -402,6 +480,7 @@ public void processReceivedData(TransactionViewModel receivedTransactionViewMode } neighbor.incNewTransactions(); broadcast(receivedTransactionViewModel); + //zmq try { BundleViewModel receivedBundle = BundleViewModel.load(tangle, receivedTransactionViewModel.getBundleHash()); @@ -428,6 +507,7 @@ public void processReceivedData(TransactionViewModel receivedTransactionViewMode } } } + /** * This is second step of incoming transaction processing. The newly received * and validated transactions are stored in {@link Node#receiveQueue}. This function @@ -435,8 +515,10 @@ public void processReceivedData(TransactionViewModel receivedTransactionViewMode * transaction is then added to the broadcast queue, to be fruther spammed to the neighbors. */ public void replyToRequest(Hash requestedHash, Neighbor neighbor) { + TransactionViewModel transactionViewModel = null; Hash transactionPointer; + //retrieve requested transaction if (requestedHash.equals(Hash.NULL_HASH)) { //Random Tip Request @@ -463,6 +545,7 @@ public void replyToRequest(Hash requestedHash, Neighbor neighbor) { log.error("Error while searching for transaction.", e); } } + if (transactionViewModel != null && transactionViewModel.getType() == TransactionViewModel.FILLED_SLOT) { //send hbytes back to neighbor try { @@ -488,15 +571,14 @@ public void replyToRequest(Hash requestedHash, Neighbor neighbor) { } } + } - /** - * This function is used once by th replyToRequest method above. - * It returns a random, solid tip to send to the pulling node. - */ + private Hash getRandomTipPointer() throws Exception { Hash tip = rnd.nextDouble() < configuration.getpSendMilestone() ? latestMilestoneTracker.getLatestMilestoneHash() : tipsViewModel.getRandomSolidTipHash(); return tip == null ? Hash.NULL_HASH : tip; } + /** * Sends a Datagram to the neighbour. Also appends a random hash request * to the outgoing packet. Note that this is only used for UDP handling. For TCP @@ -508,6 +590,7 @@ private Hash getRandomTipPointer() throws Exception { * */ public void sendPacket(DatagramPacket sendingPacket, TransactionViewModel transactionViewModel, Neighbor neighbor) throws Exception { + //limit amount of sends per second long now = System.currentTimeMillis(); if ((now - sendPacketsTimer.get()) > 1000L) { @@ -520,6 +603,7 @@ public void sendPacket(DatagramPacket sendingPacket, TransactionViewModel transa //log.info("exceeded limit - don't send - {}",sendPacketsCounter.get()); return; } + synchronized (sendingPacket) { System.arraycopy(transactionViewModel.getBytes(), 0, sendingPacket.getData(), 0, TransactionViewModel.SIZE); Hash hash = transactionRequester.transactionToRequest(rnd.nextDouble() < configuration.getpSelectMilestoneChild()); @@ -527,8 +611,10 @@ public void sendPacket(DatagramPacket sendingPacket, TransactionViewModel transa sendingPacket.getData(), TransactionViewModel.SIZE, reqHashSize); neighbor.send(sendingPacket); } + sendPacketsCounter.getAndIncrement(); } + /** * Does the same as {@link #sendPacket(DatagramPacket, TransactionViewModel, Neighbor)} but defaults to using the * same internal {@link #sendingPacket} as all the other methods in this class, which allows external callers to @@ -542,6 +628,7 @@ public void sendPacket(DatagramPacket sendingPacket, TransactionViewModel transa public void sendPacket(TransactionViewModel transactionViewModel, Neighbor neighbor) throws Exception { sendPacket(sendingPacket, transactionViewModel, neighbor); } + /** * This thread picks up a new transaction from the broadcast queue and * spams it to all of the neigbors. Sadly, this also includes the neigbor who @@ -550,8 +637,11 @@ public void sendPacket(TransactionViewModel transactionViewModel, Neighbor neigh */ private Runnable spawnBroadcasterThread() { return () -> { + log.info("Spawning Broadcaster Thread"); + while (!shuttingDown.get()) { + try { final TransactionViewModel transactionViewModel = broadcastQueue.pollFirst(); if (transactionViewModel != null) { @@ -572,22 +662,27 @@ private Runnable spawnBroadcasterThread() { log.info("Shutting down Broadcaster Thread"); }; } + /** * We send a tip request packet (transaction corresponding to the latest milestone) * to all of our neighbors periodically. */ private Runnable spawnTipRequesterThread() { return () -> { + log.info("Spawning Tips Requester Thread"); long lastTime = 0; while (!shuttingDown.get()) { + try { final TransactionViewModel transactionViewModel = TransactionViewModel.fromHash(tangle, latestMilestoneTracker.getLatestMilestoneHash()); System.arraycopy(transactionViewModel.getBytes(), 0, tipRequestingPacket.getData(), 0, TransactionViewModel.SIZE); System.arraycopy(transactionViewModel.getHash().bytes(), 0, tipRequestingPacket.getData(), TransactionViewModel.SIZE, reqHashSize); //Hash.SIZE_IN_BYTES); + neighbors.forEach(n -> n.send(tipRequestingPacket)); + long now = System.currentTimeMillis(); if ((now - lastTime) > 10000L) { lastTime = now; @@ -600,6 +695,7 @@ private Runnable spawnTipRequesterThread() { transactionRequester.numberOfTransactionsToRequest(), getReplyQueueSize(), TransactionViewModel.getNumberOfStoredTransactions(tangle)); } + Thread.sleep(5000); } catch (final Exception e) { log.error("Tips Requester Thread Exception:", e); @@ -611,8 +707,11 @@ private Runnable spawnTipRequesterThread() { private Runnable spawnProcessReceivedThread() { return () -> { + log.info("Spawning Process Received Data Thread"); + while (!shuttingDown.get()) { + try { processReceivedDataFromQueue(); Thread.sleep(1); @@ -626,8 +725,11 @@ private Runnable spawnProcessReceivedThread() { private Runnable spawnReplyToRequestThread() { return () -> { + log.info("Spawning Reply To Request Thread"); + while (!shuttingDown.get()) { + try { replyToRequestFromQueue(); Thread.sleep(1); @@ -639,6 +741,7 @@ private Runnable spawnReplyToRequestThread() { }; } + private static ConcurrentSkipListSet weightQueue() { return new ConcurrentSkipListSet<>((transaction1, transaction2) -> { if (transaction1.weightMagnitude == transaction2.weightMagnitude) { @@ -658,12 +761,14 @@ private static ConcurrentSkipListSet> weightQueueHashPair() return new ConcurrentSkipListSet>((transaction1, transaction2) -> { Hash tx1 = transaction1.getLeft(); Hash tx2 = transaction2.getLeft(); + for (int i = Hash.SIZE_IN_BYTES; i-- > 0; ) { if (tx1.bytes()[i] != tx2.bytes()[i]) { return tx2.bytes()[i] - tx1.bytes()[i]; } } return 0; + }); } @@ -671,6 +776,7 @@ private static ConcurrentSkipListSet> weigh return new ConcurrentSkipListSet>((transaction1, transaction2) -> { TransactionViewModel tx1 = transaction1.getLeft(); TransactionViewModel tx2 = transaction2.getLeft(); + if (tx1.weightMagnitude == tx2.weightMagnitude) { for (int i = Hash.SIZE_IN_BYTES; i-- > 0; ) { if (tx1.getHash().bytes()[i] != tx2.getHash().bytes()[i]) { @@ -683,6 +789,7 @@ private static ConcurrentSkipListSet> weigh }); } + public void broadcast(final TransactionViewModel transactionViewModel) { broadcastQueue.add(transactionViewModel); if (broadcastQueue.size() > BROADCAST_QUEUE_SIZE) { @@ -702,6 +809,7 @@ private ByteBuffer getBytesDigest(byte[] receivedData) throws NoSuchAlgorithmExc } // helpers methods + public boolean removeNeighbor(final URI uri, boolean isConfigured) { final Neighbor neighbor = newNeighbor(uri, isConfigured); if (uri.getScheme().equals("tcp")) { @@ -789,6 +897,7 @@ public int getReplyQueueSize() { } public class FIFOCache { + private final int capacity; private final double dropRate; private LinkedHashMap map; @@ -821,4 +930,5 @@ public V put(K key, V value) { return this.map.put(key, value); } } -} + +} \ No newline at end of file diff --git a/src/main/java/net/helix/hlx/service/stats/TransactionStatsPublisher.java b/src/main/java/net/helix/hlx/service/stats/TransactionStatsPublisher.java index 128b25a9..f1aac607 100644 --- a/src/main/java/net/helix/hlx/service/stats/TransactionStatsPublisher.java +++ b/src/main/java/net/helix/hlx/service/stats/TransactionStatsPublisher.java @@ -43,7 +43,6 @@ public class TransactionStatsPublisher { private final AtomicBoolean shuttingDown = new AtomicBoolean(false); private Thread thread; - public TransactionStatsPublisher(Tangle tangle, TipsViewModel tipsViewModel, TipSelector tipsSelector) { this.tangle = tangle; @@ -93,12 +92,8 @@ private Hash getSuperTip() throws Exception { } private long getConfirmedTransactionsCount(Instant now) throws Exception { -<<<<<<< HEAD - return approveeCounter.getCount(now, getSuperTip(), new HashSet<>()); -======= return approveeCounter.getCount(now, getSuperTip(), new HashSet<>(), true); ->>>>>>> upstream/dev } private long getAllTransactionsCount(Instant now) throws Exception { @@ -111,13 +106,8 @@ private long getAllTransactionsCount(Instant now) throws Exception { if (approveeCounter.isInTimeWindow(now, TransactionViewModel.fromHash(tangle, tip))) { count += 1 + approveeCounter.getCount(now, tip, processedTransactions, false); } else { -<<<<<<< HEAD - // even if the tip is not in the time window, count parents that might be older - count += approveeCounter.getCount(now, tip, processedTransactions); -======= // even if the tip is not in the time window, count approvees that might be older count += approveeCounter.getCount(now, tip, processedTransactions, false); ->>>>>>> upstream/dev } } diff --git a/src/main/resources/logback-save.xml b/src/main/resources/logback-save.xml index fa1fdb09..40d961ae 100644 --- a/src/main/resources/logback-save.xml +++ b/src/main/resources/logback-save.xml @@ -27,7 +27,6 @@ %d{MM/dd HH:mm:ss.SSS} [%thread] %-5level %logger{0}:%L - %msg%n
-
${log.name} From f8844f9f7fb3e66714a1fb921af4a216060b5d69 Mon Sep 17 00:00:00 2001 From: dnck Date: Fri, 28 Jun 2019 11:46:59 +0200 Subject: [PATCH 07/15] format and style to match upstream --- .../helix/hlx/service/stats/TransactionStatsPublisher.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/net/helix/hlx/service/stats/TransactionStatsPublisher.java b/src/main/java/net/helix/hlx/service/stats/TransactionStatsPublisher.java index f1aac607..dc3155ed 100644 --- a/src/main/java/net/helix/hlx/service/stats/TransactionStatsPublisher.java +++ b/src/main/java/net/helix/hlx/service/stats/TransactionStatsPublisher.java @@ -71,7 +71,6 @@ private Runnable getRunnable() { tangle.publish(CONFIRMED_TRANSACTIONS_TOPIC + " %d", numConfirmed); tangle.publish(TOTAL_TRANSACTIONS_TOPIC + " %d", numTransactions); - } catch (Exception e) { log.error("Error while getting transaction counts : {}", e); } @@ -85,6 +84,7 @@ private Runnable getRunnable() { } private Hash getSuperTip() throws Exception { + // call the usual tip selection and return the first tip List tips = tipsSelector.getTransactionsToApprove(3, Optional.empty()); @@ -97,7 +97,8 @@ private long getConfirmedTransactionsCount(Instant now) throws Exception { } private long getAllTransactionsCount(Instant now) throws Exception { - // count all transactions in a scalable way, by counting the parents of all the tips + + // count all transactions in a scalable way, by counting the approvees of all the tips HashSet processedTransactions = new HashSet<>(); long count = 0; for (Hash tip : tipsViewModel.getTips()) { @@ -130,4 +131,4 @@ public void shutdown() { log.error("Error in shutdown", e); } } -} +} \ No newline at end of file From a1ff91312b7f5d203064e085687bb7772667b01e Mon Sep 17 00:00:00 2001 From: dnck Date: Mon, 22 Jul 2019 13:46:21 +0200 Subject: [PATCH 08/15] log output for tip selection --- src/main/java/net/helix/hlx/service/API.java | 3 +++ .../service/tipselection/impl/CumulativeWeightCalculator.java | 1 + 2 files changed, 4 insertions(+) diff --git a/src/main/java/net/helix/hlx/service/API.java b/src/main/java/net/helix/hlx/service/API.java index 5fae3a4e..38a2d11a 100644 --- a/src/main/java/net/helix/hlx/service/API.java +++ b/src/main/java/net/helix/hlx/service/API.java @@ -526,6 +526,9 @@ private synchronized AbstractResponse getTransactionsToApproveStatement(int dept try { List tips = getTransactionToApproveTips(depth, reference); + // TODO remove the forEach + tips.forEach((n) -> log.info("Selected tips: {}", n.toString())); + return GetTransactionsToApproveResponse.create(tips.get(0), tips.get(1)); } catch (Exception e) { diff --git a/src/main/java/net/helix/hlx/service/tipselection/impl/CumulativeWeightCalculator.java b/src/main/java/net/helix/hlx/service/tipselection/impl/CumulativeWeightCalculator.java index 106bf949..9c14e4b1 100644 --- a/src/main/java/net/helix/hlx/service/tipselection/impl/CumulativeWeightCalculator.java +++ b/src/main/java/net/helix/hlx/service/tipselection/impl/CumulativeWeightCalculator.java @@ -106,6 +106,7 @@ private Collection getTxDirectApproversHashes(Hash txHash, Map calculateCwInOrder(LinkedHashSet txsToRate) throws Exception { + log.info("Total # transactions to calculate weight for the walk to the tips: {}", txsToRate.size()); UnIterableMap> txHashToApprovers = createTxHashToApproversPrefixMap(); UnIterableMap txHashToCumulativeWeight = createTxHashToCumulativeWeightMap(txsToRate.size()); From 1aba2aa257f92842b7936640e9edc8550097a986 Mon Sep 17 00:00:00 2001 From: dnck Date: Wed, 4 Sep 2019 16:02:44 +0200 Subject: [PATCH 09/15] match style in upstream --- configure-many.py | 128 ------------------- src/main/java/net/helix/hlx/service/API.java | 2 +- 2 files changed, 1 insertion(+), 129 deletions(-) delete mode 100644 configure-many.py diff --git a/configure-many.py b/configure-many.py deleted file mode 100644 index 9e4e802d..00000000 --- a/configure-many.py +++ /dev/null @@ -1,128 +0,0 @@ -# -*- coding: utf-8 *-* -# Module: Topology & Neighbors -# This module will handle the static topology of the network -# and generate configuration files for the jars. -# Note that the term, "static", is important, since the overall goal -# should be to acheive a simulation that allows for node joining and -# leaving the network. -import shutil -import os -from subprocess import call -import argparse -import networkx as nx - -class SmallWorldTopology(): - def __init__(self, numNodes, averageNeighborsPerNode, probabilityAddingAdditionalNeighbors): - self.nNeighbors = averageNeighborsPerNode - self.pNewEdge = probabilityAddingAdditionalNeighbors - self.overlay = nx.watts_strogatz_graph(n=numNodes, k=self.nNeighbors, p=self.pNewEdge) - - def get_neighbor(self, nodeID): - if list(self.overlay.neighbors(nodeID)): - return list(self.overlay.neighbors(nodeID)) - - def get_matrix(self): - self.overlay_matrix = nx.to_numpy_matrix(self.overlay) - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description='') - parser.add_argument('-apiPortStart', metavar='Port to start iterating at', type=int, default=14000, help='') - parser.add_argument('-udpReceiverPortStart', metavar='UDP Receiver port to start iterating at', type=int, default=24000, help='') - parser.add_argument('-tcpReceiverPortStart', metavar='TCP Receiver port to start iterating at', type=int, default=34000, help='') - parser.add_argument('-zmqIpcStart', metavar='ZMQ ipc port to start iterating at', type=int, default=1, help='') - parser.add_argument('-zmqPortStart', metavar='ZMQ port to start iterating at', type=int, default=5550, help='') - parser.add_argument('-numNodes', metavar='Number of nodes.', type=int, default=10, help='') - args = parser.parse_args() - apiPortStart = args.apiPortStart - udpReceiverPortStart = args.udpReceiverPortStart - tcpReceiverPortStart = args.tcpReceiverPortStart - inet = "127.0.0.1" - tcpInetPort = udpReceiverPortStart - udpInetPort = udpReceiverPortStart - zmqIpcStart = args.zmqIpcStart - zmqPortStart = args.zmqPortStart - numNodes = args.numNodes - topology = SmallWorldTopology(numNodes, averageNeighborsPerNode=8, probabilityAddingAdditionalNeighbors=0.1) - for i in range(numNodes): - config = """#NODE {} CONFIG -API_PORT = {} -API_HOST = localhost -REMOTE_LIMIT_API = addNeighbors, getNeighbors, removeNeighbors, attachToTangle, interruptAttachingToTangle -MAX_FIND_TRANSACTIONS = 100_000 -MAX_REQUESTS_LIST = 1_000 -MAX_GET_BYTES = 10_000 -MAX_BODY_LENGTH = 1_000_000 -#REMOTE_AUTH = -MS_DELAY = 0 -UDP_RECEIVER_PORT = {} -TCP_RECEIVER_PORT = {} -NEIGHBORS -P_REMOVE_REQUEST = 0.01 -SEND_LIMIT = -1 -MAX_PEERS = 0 -DNS_REFRESHER_ENABLED = true -DNS_RESOLUTION_ENABLED = true -HXI_DIR = hxi -DB_PATH = mainnetdb -DB_LOG_PATH = mainnet.log -DB_CACHE_SIZE = 100_000 -ROCKS_DB = rocksdb -REVALIDATE = false -RESCAN_DB = false -P_REPLY_RANDOM_TIP = 0.66 -P_DROP_TRANSACTION = 0.0 -P_SELECT_MILESTONE_CHILD = 0.7 -P_SEND_MILESTONE = 0.02 -P_PROPAGATE_REQUEST = 0.01 -MWM = 1 -PACKET_SIZE = 1200 -REQ_HASH_SIZE = 32 -QUEUE_SIZE = 1_000 -P_DROP_CACHE_ENTRY = 0.02 -CACHE_SIZE_BYTES = 150_000 -ZMQ_THREADS = 1 -ZMQ_IPC = ipc://sbx/feeds/{} -ZMQ_ENABLED = false -ZMQ_PORT = {} -GRAPH_ENABLED = false -MAX_DEPTH = 15 -ALPHA = 0.001 -TIP_SOLIDIFIER_ENABLED = true -POW_THREADS = 0 -COORDINATOR_ADDRESS = 6a8413edc634e948e3446806afde11b17e0e188faf80a59a8b1147a0600cc5db -LOCAL_SNAPSHOTS_ENABLED = true -LOCAL_SNAPSHOTS_PRUNING_ENABLED = true -LOCAL_SNAPSHOTS_PRUNING_DELAY = 50000 -LOCAL_SNAPSHOTS_INTERVAL_SYNCED = 10 -LOCAL_SNAPSHOTS_INTERVAL_UNSYNCED = 1000 -LOCAL_SNAPSHOTS_BASE_PATH = mainnet -LOCAL_SNAPSHOTS_DEPTH = 100 -SNAPSHOT_FILE = /snapshotMainnet.txt -SNAPSHOT_SIG_FILE = /snapshotMainnet.sig -PREVIOUS_EPOCHS_SPENT_ADDRESSES_TXT = /previousEpochsSpentAddresses.txt -PREVIOUS_EPOCHS_SPENT_ADDRESSES_SIG = /previousEpochsSpentAddresses.sig -GLOBAL_SNAPSHOT_TIME = 1522235533 -MILESTONE_START_INDEX = 0 -NUM_KEYS_IN_MILESTONE = 10 -MAX_ANALYZED_TXS = 20_000 -SAVELOG = true -SAVELOG_BASE_PATH = logs/ -SAVELOG_XML_FILE = /logback-save.xml -""".format(i, apiPortStart,udpReceiverPortStart,tcpReceiverPortStart,zmqIpcStart, zmqPortStart) - nodeNeighbors = "" - for n in topology.get_neighbor(i): - nodeNeighbors = nodeNeighbors + "tcp://{}:{},".format(inet, tcpInetPort+n) - nodeNeighbors = nodeNeighbors[:-1] - config = config.replace("NEIGHBORS", "NEIGHBORS = {}".format(nodeNeighbors)) - apiPortStart +=1 - udpReceiverPortStart +=1 - tcpReceiverPortStart +=1 - zmqIpcStart +=1 - zmqPortStart +=1 - numNodes +=1 - if os.path.isdir(os.path.normpath("./many_configs")): - pass - else: - os.mkdir(os.path.normpath("./many_configs")) - with open(os.path.normpath("./many_configs/config{}.ini".format(i)), "w") as f: - f.write(config) diff --git a/src/main/java/net/helix/hlx/service/API.java b/src/main/java/net/helix/hlx/service/API.java index 138e7faf..bcd02481 100644 --- a/src/main/java/net/helix/hlx/service/API.java +++ b/src/main/java/net/helix/hlx/service/API.java @@ -1716,4 +1716,4 @@ private Function, AbstractResponse> wereAddressesSpentFrom() } }; } -} +} \ No newline at end of file From 16037d603b8de967517f1daf9012be8eb25445e8 Mon Sep 17 00:00:00 2001 From: dnck Date: Wed, 4 Sep 2019 16:03:52 +0200 Subject: [PATCH 10/15] match style in upstream --- src/main/java/net/helix/hlx/network/Node.java | 2 +- src/main/java/net/helix/hlx/service/API.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/net/helix/hlx/network/Node.java b/src/main/java/net/helix/hlx/network/Node.java index 7230bc35..06df96d7 100644 --- a/src/main/java/net/helix/hlx/network/Node.java +++ b/src/main/java/net/helix/hlx/network/Node.java @@ -931,4 +931,4 @@ public V put(K key, V value) { } } -} \ No newline at end of file +} diff --git a/src/main/java/net/helix/hlx/service/API.java b/src/main/java/net/helix/hlx/service/API.java index bcd02481..138e7faf 100644 --- a/src/main/java/net/helix/hlx/service/API.java +++ b/src/main/java/net/helix/hlx/service/API.java @@ -1716,4 +1716,4 @@ private Function, AbstractResponse> wereAddressesSpentFrom() } }; } -} \ No newline at end of file +} From 9b3002e33d4d62b49130c8157206bdbf7a2eef4a Mon Sep 17 00:00:00 2001 From: dnck Date: Wed, 4 Sep 2019 16:05:55 +0200 Subject: [PATCH 11/15] match style in upstream --- .../net/helix/hlx/service/stats/TransactionStatsPublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/net/helix/hlx/service/stats/TransactionStatsPublisher.java b/src/main/java/net/helix/hlx/service/stats/TransactionStatsPublisher.java index 7a798946..103fbeab 100644 --- a/src/main/java/net/helix/hlx/service/stats/TransactionStatsPublisher.java +++ b/src/main/java/net/helix/hlx/service/stats/TransactionStatsPublisher.java @@ -130,4 +130,4 @@ public void shutdown() { log.error("Error in shutdown", e); } } -} \ No newline at end of file +} From c0bfcc8adeaf01826fc77164f6dd3056aaf54432 Mon Sep 17 00:00:00 2001 From: dnck Date: Wed, 4 Sep 2019 16:20:39 +0200 Subject: [PATCH 12/15] even with upstream dev --- src/main/java/net/helix/hlx/service/API.java | 3 --- .../service/tipselection/impl/CumulativeWeightCalculator.java | 1 - 2 files changed, 4 deletions(-) diff --git a/src/main/java/net/helix/hlx/service/API.java b/src/main/java/net/helix/hlx/service/API.java index 138e7faf..a23d97b4 100644 --- a/src/main/java/net/helix/hlx/service/API.java +++ b/src/main/java/net/helix/hlx/service/API.java @@ -509,9 +509,6 @@ private synchronized AbstractResponse getTransactionsToApproveStatement(int dept try { List tips = getTransactionToApproveTips(depth, reference); - // TODO remove the forEach - tips.forEach((n) -> log.info("Selected tips: {}", n.toString())); - return GetTransactionsToApproveResponse.create(tips.get(0), tips.get(1)); } catch (Exception e) { diff --git a/src/main/java/net/helix/hlx/service/tipselection/impl/CumulativeWeightCalculator.java b/src/main/java/net/helix/hlx/service/tipselection/impl/CumulativeWeightCalculator.java index 9c14e4b1..106bf949 100644 --- a/src/main/java/net/helix/hlx/service/tipselection/impl/CumulativeWeightCalculator.java +++ b/src/main/java/net/helix/hlx/service/tipselection/impl/CumulativeWeightCalculator.java @@ -106,7 +106,6 @@ private Collection getTxDirectApproversHashes(Hash txHash, Map calculateCwInOrder(LinkedHashSet txsToRate) throws Exception { - log.info("Total # transactions to calculate weight for the walk to the tips: {}", txsToRate.size()); UnIterableMap> txHashToApprovers = createTxHashToApproversPrefixMap(); UnIterableMap txHashToCumulativeWeight = createTxHashToCumulativeWeightMap(txsToRate.size()); From 001229bc479186404fc38de277bdf56387e40383 Mon Sep 17 00:00:00 2001 From: dnck Date: Wed, 4 Sep 2019 16:48:01 +0200 Subject: [PATCH 13/15] clean up removedb.py --- removedb.py | 60 ++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 14 deletions(-) diff --git a/removedb.py b/removedb.py index 8d614efb..b2278922 100644 --- a/removedb.py +++ b/removedb.py @@ -1,31 +1,63 @@ +# -*- coding: utf-8 *-* +""" +Python script to make life easier when starting a fresh node. + +Usage: + # Remove all db and snapshot stuff and compiles package: + python removedb.py + # Remove but do not compile + python removedb.py -compile False + # Compile but do not remove + python removedb.py -compile True + # Only cp + python removedb.py -all False -compile False -cp ~/helix/testnet/fork1/testnet-1.0 +""" import shutil import os from subprocess import call import argparse if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Remove the databases from the node to start fresh') - parser.add_argument('-all', metavar='Remove everything', type=bool, default=True, help='Remove everything including the snapshot stuff') - parser.add_argument('-compile', metavar='run mvn clean compile package', type=bool, default=False, help='run compile project after removing stuff') - parser.add_argument('-cpr', metavar='copy and paste the src and target of a compiled version here', type=bool, default=False, help='cp -r dir1 -> pwd') + parser = argparse.ArgumentParser( + description='Remove the databases and/or compile to start a node fresh' + ) + + parser.add_argument('-all', + metavar='Remove everything', type=lambda s: s.lower() in ['true', 't', 'yes', '1'], default='True', + help='Remove everything including the snapshot stuff.' + ) + + parser.add_argument('-compile', + metavar='run mvn clean compile package', type=lambda s: s.lower() in ['true', 't', 'yes', '1'], default='True', + help='run compile project after removing stuff' + ) + + parser.add_argument('-cp', + metavar='cp ./src && ./target of cur wd to the dir supplied on cmd line', type=str, default='', + help='cp -r pwd dir' + ) + args = parser.parse_args() + dbs = ['db', 'hxi', 'spent-addresses-log', 'spent-addresses-db', 'mainnet.log', 'mainnetdb'] fs = ['mainnet.snapshot.meta', 'mainnet.snapshot.state', 'mainnet.snapshot.meta.bkp', 'mainnet.snapshot.state.bkp'] - for dir in dbs: - try: - shutil.rmtree(dir) - except: - print("{} does not exist".format(dir)) + if args.all: + for dir in dbs: + try: + shutil.rmtree(dir) + except: + print("{} does not exist".format(dir)) + for f in fs: try: os.remove(f) except: print("{} does not exist".format(f)) + if args.compile: call('mvn clean compile package', shell=True) - if args.cpr: - shutil.rmtree('src') - shutil.rmtree('target') - call('cp -r /home/hlx-dev/helix/testnet/fork0/testnet-1.0/src /home/hlx-dev/helix/testnet/fork1/testnet-1.0/', shell=True) - call('cp -r /home/hlx-dev/helix/testnet/fork0/testnet-1.0/target /home/hlx-dev/helix/testnet/fork1/testnet-1.0/', shell=True) + + if os.path.isdir(args.cp): + call('cp -r ./src {}'.format(args.cp), shell=True) + call('cp -r ./target {}'.format(args.cp), shell=True) From e6ecec531bf8fb85385b492c78eed3a4276e90de Mon Sep 17 00:00:00 2001 From: dnck Date: Thu, 5 Sep 2019 11:13:26 +0200 Subject: [PATCH 14/15] save logs refactor --- .../net/helix/hlx/utils/HelixIOUtils.java | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/main/java/net/helix/hlx/utils/HelixIOUtils.java b/src/main/java/net/helix/hlx/utils/HelixIOUtils.java index ebc2ef81..09ed7a87 100644 --- a/src/main/java/net/helix/hlx/utils/HelixIOUtils.java +++ b/src/main/java/net/helix/hlx/utils/HelixIOUtils.java @@ -6,6 +6,7 @@ import java.nio.file.Paths; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.UUID; public class HelixIOUtils extends IOUtils { @@ -22,21 +23,23 @@ public static void closeQuietly(AutoCloseable... autoCloseables) { } public static void saveLogs() { - String date_parsed = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date()); - String root_dir = System.getProperty("user.dir"); - String slash = System.getProperty("file.separator"); - String logback_xml_filepath = root_dir + slash + "src" + slash + "main" + slash + "resources" + slash + "logback-save.xml"; - String logs_dir = root_dir + slash + "logs"; - String log_filepath = root_dir + slash + "logs" + slash + "LOG__"+date_parsed+"__.log"; - - System.setProperty("log.name", log_filepath); - System.setProperty("logback.configurationFile", logback_xml_filepath); - - File path_to_log_dir = Paths.get(logs_dir).toFile(); - + String dateParsed = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date()); + UUID uuid = UUID.randomUUID(); + String uuidString = uuid.toString(); + String rootDir = System.getProperty("user.dir"); + String fileSeperator = System.getProperty("file.separator"); + String logbackXmlFilepath = String.join( + fileSeperator, rootDir, "src", "main", "resources", "logback-save.xml" + ); + String logsDir = String.join(fileSeperator, rootDir, "logs"); + String logName = "log-" + dateParsed + "-" + uuidString + ".log"; + String logFilepath = String.join(fileSeperator, rootDir, "logs", logName); + System.setProperty("log.name", logFilepath); + System.setProperty("logback.configurationFile", logbackXmlFilepath); + File pathToLogDir = Paths.get(logsDir).toFile(); // check whether path to logs exists and logs is a directory. - if (!path_to_log_dir.exists() || !path_to_log_dir.isDirectory()) { - path_to_log_dir.mkdir(); + if (!pathToLogDir.exists() || !pathToLogDir.isDirectory()) { + pathToLogDir.mkdir(); } } -} +} \ No newline at end of file From 71f82e3331185860e1be4fbba0201f27b42a3626 Mon Sep 17 00:00:00 2001 From: dnck Date: Thu, 5 Sep 2019 11:16:47 +0200 Subject: [PATCH 15/15] remove utility python script from commit --- removedb.py | 63 ----------------------------------------------------- 1 file changed, 63 deletions(-) delete mode 100644 removedb.py diff --git a/removedb.py b/removedb.py deleted file mode 100644 index b2278922..00000000 --- a/removedb.py +++ /dev/null @@ -1,63 +0,0 @@ -# -*- coding: utf-8 *-* -""" -Python script to make life easier when starting a fresh node. - -Usage: - # Remove all db and snapshot stuff and compiles package: - python removedb.py - # Remove but do not compile - python removedb.py -compile False - # Compile but do not remove - python removedb.py -compile True - # Only cp - python removedb.py -all False -compile False -cp ~/helix/testnet/fork1/testnet-1.0 -""" -import shutil -import os -from subprocess import call -import argparse - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description='Remove the databases and/or compile to start a node fresh' - ) - - parser.add_argument('-all', - metavar='Remove everything', type=lambda s: s.lower() in ['true', 't', 'yes', '1'], default='True', - help='Remove everything including the snapshot stuff.' - ) - - parser.add_argument('-compile', - metavar='run mvn clean compile package', type=lambda s: s.lower() in ['true', 't', 'yes', '1'], default='True', - help='run compile project after removing stuff' - ) - - parser.add_argument('-cp', - metavar='cp ./src && ./target of cur wd to the dir supplied on cmd line', type=str, default='', - help='cp -r pwd dir' - ) - - args = parser.parse_args() - - dbs = ['db', 'hxi', 'spent-addresses-log', 'spent-addresses-db', 'mainnet.log', 'mainnetdb'] - fs = ['mainnet.snapshot.meta', 'mainnet.snapshot.state', 'mainnet.snapshot.meta.bkp', 'mainnet.snapshot.state.bkp'] - - if args.all: - for dir in dbs: - try: - shutil.rmtree(dir) - except: - print("{} does not exist".format(dir)) - - for f in fs: - try: - os.remove(f) - except: - print("{} does not exist".format(f)) - - if args.compile: - call('mvn clean compile package', shell=True) - - if os.path.isdir(args.cp): - call('cp -r ./src {}'.format(args.cp), shell=True) - call('cp -r ./target {}'.format(args.cp), shell=True)