Skip to content

Commit

Permalink
Peer: Break out the onTransactionBroadcast event into a separate inte…
Browse files Browse the repository at this point in the history
…rface.
  • Loading branch information
mikehearn committed Sep 18, 2015
1 parent c2a67aa commit 7014810
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 116 deletions.
58 changes: 27 additions & 31 deletions core/src/main/java/org/bitcoinj/core/Peer.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public PeerConnectionListenerRegistration(PeerConnectionEventListener listener,
}
private final CopyOnWriteArrayList<PeerConnectionListenerRegistration> connectionEventListeners;
private final CopyOnWriteArrayList<ListenerRegistration<PeerDataEventListener>> dataEventListeners;
private final CopyOnWriteArrayList<ListenerRegistration<OnTransactionBroadcastListener>> onTransactionEventListeners;
// Whether to try and download blocks and transactions from this peer. Set to false by PeerGroup if not the
// primary peer. This is to avoid redundant work and concurrency problems with downloading the same chain
// in parallel.
Expand Down Expand Up @@ -217,6 +218,7 @@ public Peer(NetworkParameters params, VersionMessage ver, PeerAddress remoteAddr
this.getDataFutures = new CopyOnWriteArrayList<GetDataRequest>();
this.connectionEventListeners = new CopyOnWriteArrayList<PeerConnectionListenerRegistration>();
this.dataEventListeners = new CopyOnWriteArrayList<ListenerRegistration<PeerDataEventListener>>();
this.onTransactionEventListeners = new CopyOnWriteArrayList<ListenerRegistration<OnTransactionBroadcastListener>>();
this.getAddrFutures = new LinkedList<SettableFuture<AddressMessage>>();
this.fastCatchupTimeSecs = params.getGenesisBlock().getTimeSeconds();
this.isAcked = false;
Expand Down Expand Up @@ -255,57 +257,47 @@ public void addEventListener(AbstractPeerEventListener listener) {
public void addEventListener(AbstractPeerEventListener listener, Executor executor) {
addConnectionEventListener(executor, listener);
addDataEventListener(executor, listener);
addOnTransactionBroadcastListener(executor, listener);
}

/** Deprecated: use the more specific event handler methods instead */
@Deprecated
public void removeEventListener(AbstractPeerEventListener listener) {
removeConnectionEventListener(listener);
removeDataEventListener(listener);
removeOnTransactionBroadcastListener(listener);
}

/**
* Registers the given object as an event listener that will be invoked on the user thread. Note that listeners
* added this way will <b>not</b> receive {@link PeerEventListener#getData(Peer, GetDataMessage)} or
* {@link PeerEventListener#onPreMessageReceived(Peer, Message)} calls because those require that the listener
* be added using {@link Threading#SAME_THREAD}, which requires the other addListener form.
*/
/** Registers a listener that is invoked when a peer is connected or disconnected. */
public void addConnectionEventListener(PeerConnectionEventListener listener) {
addConnectionEventListener(Threading.USER_THREAD, listener);
}

/**
* Registers the given object as an event listener that will be invoked on the user thread. Note that listeners
* added this way will <b>not</b> receive {@link PeerEventListener#getData(Peer, GetDataMessage)} or
* {@link PeerEventListener#onPreMessageReceived(Peer, Message)} calls because those require that the listener
* be added using {@link Threading#SAME_THREAD}, which requires the other addListener form.
*/
public void addDataEventListener(PeerDataEventListener listener) {
addDataEventListener(Threading.USER_THREAD, listener);
}

/**
* Registers the given object as an event listener that will be invoked by the given executor. Note that listeners
* added using any other executor than {@link Threading#SAME_THREAD} will <b>not</b> receive
* {@link PeerEventListener#getData(Peer, GetDataMessage)} or
* {@link PeerEventListener#onPreMessageReceived(Peer, Message)} calls because this class is not willing to cross
* threads in order to get the results of those hook methods.
*/
/** Registers a listener that is invoked when a peer is connected or disconnected. */
public void addConnectionEventListener(Executor executor, PeerConnectionEventListener listener) {
connectionEventListeners.add(new PeerConnectionListenerRegistration(listener, executor));
}

/**
* Registers the given object as an event listener that will be invoked by the given executor. Note that listeners
* added using any other executor than {@link Threading#SAME_THREAD} will <b>not</b> receive
* {@link PeerEventListener#getData(Peer, GetDataMessage)} or
* {@link PeerEventListener#onPreMessageReceived(Peer, Message)} calls because this class is not willing to cross
* threads in order to get the results of those hook methods.
*/
/** Registers a listener that is called when messages are received. */
public void addDataEventListener(PeerDataEventListener listener) {
addDataEventListener(Threading.USER_THREAD, listener);
}

/** Registers a listener that is called when messages are received. */
public void addDataEventListener(Executor executor, PeerDataEventListener listener) {
dataEventListeners.add(new ListenerRegistration<PeerDataEventListener>(executor, listener));
}

/** Registers a listener that is called when a transaction is broadcast across the network */
public void addOnTransactionBroadcastListener(OnTransactionBroadcastListener listener) {
addOnTransactionBroadcastListener(Threading.USER_THREAD, listener);
}

/** Registers a listener that is called when a transaction is broadcast across the network */
public void addOnTransactionBroadcastListener(Executor executor, OnTransactionBroadcastListener listener) {
onTransactionEventListeners.add(new ListenerRegistration<OnTransactionBroadcastListener>(executor, listener));
}

// Package-local version for PeerGroup
void addConnectionEventListenerWithoutOnDisconnect(Executor executor, PeerConnectionEventListener listener) {
connectionEventListeners.add(new PeerConnectionListenerRegistration(listener, executor, false));
Expand All @@ -319,6 +311,10 @@ public boolean removeDataEventListener(PeerDataEventListener listener) {
return ListenerRegistration.removeFromList(listener, dataEventListeners);
}

public boolean removeOnTransactionBroadcastListener(OnTransactionBroadcastListener listener) {
return ListenerRegistration.removeFromList(listener, onTransactionEventListeners);
}

@Override
public String toString() {
PeerAddress addr = getAddress();
Expand Down Expand Up @@ -739,7 +735,7 @@ public void onFailure(Throwable throwable) {
}
// Tell all listeners about this tx so they can decide whether to keep it or not. If no listener keeps a
// reference around then the memory pool will forget about it after a while too because it uses weak references.
for (final ListenerRegistration<PeerDataEventListener> registration : dataEventListeners) {
for (final ListenerRegistration<OnTransactionBroadcastListener> registration : onTransactionEventListeners) {
registration.executor.execute(new Runnable() {
@Override
public void run() {
Expand Down
89 changes: 43 additions & 46 deletions core/src/main/java/org/bitcoinj/core/PeerGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,7 @@
import com.squareup.okhttp.*;
import com.subgraph.orchid.*;
import net.jcip.annotations.*;
import org.bitcoinj.core.listeners.AbstractPeerConnectionEventListener;
import org.bitcoinj.core.listeners.AbstractPeerDataEventListener;
import org.bitcoinj.core.listeners.AbstractWalletEventListener;
import org.bitcoinj.core.listeners.DownloadProgressTracker;
import org.bitcoinj.core.listeners.PeerConnectionEventListener;
import org.bitcoinj.core.listeners.PeerDataEventListener;
import org.bitcoinj.core.listeners.WalletEventListener;
import org.bitcoinj.core.listeners.*;
import org.bitcoinj.crypto.*;
import org.bitcoinj.net.*;
import org.bitcoinj.net.discovery.*;
Expand Down Expand Up @@ -119,10 +113,11 @@ public class PeerGroup implements TransactionBroadcaster {
@GuardedBy("lock") private Peer downloadPeer;
// Callback for events related to chain download.
@Nullable @GuardedBy("lock") private PeerDataEventListener downloadListener;
// Callbacks for events related to peer connection/disconnection
private final CopyOnWriteArrayList<ListenerRegistration<PeerConnectionEventListener>> peerConnectionEventListeners;
// Callbacks for events related to peer data being received
private final CopyOnWriteArrayList<ListenerRegistration<PeerDataEventListener>> peerDataEventListeners;
/** Callbacks for events related to peer connection/disconnection */
protected final CopyOnWriteArrayList<ListenerRegistration<PeerConnectionEventListener>> peerConnectionEventListeners;
/** Callbacks for events related to peer data being received */
protected final CopyOnWriteArrayList<ListenerRegistration<PeerDataEventListener>> peerDataEventListeners;
protected final CopyOnWriteArrayList<ListenerRegistration<OnTransactionBroadcastListener>> onTransactionBroadastEventListeners;
// Peer discovery sources, will be polled occasionally if there aren't enough inactives.
private final CopyOnWriteArraySet<PeerDiscovery> peerDiscoverers;
// The version message to use for new connections.
Expand Down Expand Up @@ -411,6 +406,7 @@ public int compare(PeerAddress a, PeerAddress b) {
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
peerConnectionEventListeners = new CopyOnWriteArrayList<ListenerRegistration<PeerConnectionEventListener>>();
peerDataEventListeners = new CopyOnWriteArrayList<ListenerRegistration<PeerDataEventListener>>();
onTransactionBroadastEventListeners = new CopyOnWriteArrayList<ListenerRegistration<OnTransactionBroadcastListener>>();
runningBroadcasts = Collections.synchronizedSet(new HashSet<TransactionBroadcast>());
bloomFilterMerger = new FilterMerger(DEFAULT_BLOOM_FILTER_FP_RATE);
}
Expand Down Expand Up @@ -691,17 +687,7 @@ public void addConnectionEventListener(Executor executor, PeerConnectionEventLis
peer.addConnectionEventListener(executor, listener);
}

/**
* <p>Adds a listener that will be notified on the given executor when:</p>
* <ol>
* <li>New peers are connected to.</li>
* <li>Peers are disconnected from.</li>
* <li>A message is received by the download peer (there is always one peer which is elected as a peer which
* will be used to retrieve data).
* <li>Blocks are downloaded by the download peer.</li>
* </li>
* </ol>
*/
/** See {@link Peer#addDataEventListener(Executor, PeerDataEventListener)} */
public void addDataEventListener(final Executor executor, final PeerDataEventListener listener) {
peerDataEventListeners.add(new ListenerRegistration<PeerDataEventListener>(executor, checkNotNull(listener)));
for (Peer peer : getConnectedPeers())
Expand All @@ -710,22 +696,30 @@ public void addDataEventListener(final Executor executor, final PeerDataEventLis
peer.addDataEventListener(executor, listener);
}

/**
* Same as {@link PeerGroup#addEventListener(PeerEventListener, java.util.concurrent.Executor)} but defaults
* to running on the user thread.
*/
public void addConnectionEventListener(PeerConnectionEventListener listener) {
addConnectionEventListener(Threading.USER_THREAD, listener);
}

/**
* Same as {@link PeerGroup#addEventListener(PeerEventListener, java.util.concurrent.Executor)} but defaults
* to running on the user thread.
*/
/** See {@link Peer#addDataEventListener(PeerDataEventListener)} */
public void addDataEventListener(PeerDataEventListener listener) {
addDataEventListener(Threading.USER_THREAD, listener);
}

/** See {@link Peer#addOnTransactionBroadcastListener(OnTransactionBroadcastListener)} */
public void addOnTransactionBroadcastListener(OnTransactionBroadcastListener listener) {
addOnTransactionBroadcastListener(Threading.USER_THREAD, listener);
}

/** See {@link Peer#addOnTransactionBroadcastListener(OnTransactionBroadcastListener)} */
public void addOnTransactionBroadcastListener(Executor executor, OnTransactionBroadcastListener listener) {
onTransactionBroadastEventListeners.add(new ListenerRegistration<OnTransactionBroadcastListener>(executor, checkNotNull(listener)));
for (Peer peer : getConnectedPeers())
peer.addOnTransactionBroadcastListener(executor, listener);
for (Peer peer: getPendingPeers())
peer.addOnTransactionBroadcastListener(executor, listener);
}

/** See {@link Peer#addConnectionEventListener(PeerConnectionEventListener)} */
public void addConnectionEventListener(PeerConnectionEventListener listener) {
addConnectionEventListener(Threading.USER_THREAD, listener);
}

/** The given event listener will no longer be called with events. */
public boolean removeConnectionEventListener(PeerConnectionEventListener listener) {
boolean result = ListenerRegistration.removeFromList(listener, peerConnectionEventListeners);
Expand All @@ -746,12 +740,14 @@ public boolean removeDataEventListener(PeerDataEventListener listener) {
return result;
}

/**
* Removes all event listeners simultaneously. Note that this includes listeners added internally by the framework
* so it's generally not advised to use this - it exists for special purposes only.
*/
public void clearEventListeners() {
peerConnectionEventListeners.clear();
/** The given event listener will no longer be called with events. */
public boolean removeOnTransactionBroadcastListener(OnTransactionBroadcastListener listener) {
boolean result = ListenerRegistration.removeFromList(listener, onTransactionBroadastEventListeners);
for (Peer peer : getConnectedPeers())
peer.removeOnTransactionBroadcastListener(listener);
for (Peer peer : getPendingPeers())
peer.removeOnTransactionBroadcastListener(listener);
return result;
}

/**
Expand Down Expand Up @@ -1393,12 +1389,12 @@ protected void handleNewPeer(final Peer peer) {
// Make sure the peer knows how to upload transactions that are requested from us.
peer.addDataEventListener(Threading.SAME_THREAD, peerListener);
// And set up event listeners for clients. This will allow them to find out about new transactions and blocks.
for (ListenerRegistration<PeerConnectionEventListener> registration : peerConnectionEventListeners) {
for (ListenerRegistration<PeerConnectionEventListener> registration : peerConnectionEventListeners)
peer.addConnectionEventListenerWithoutOnDisconnect(registration.executor, registration.listener);
}
for (ListenerRegistration<PeerDataEventListener> registration : peerDataEventListeners) {
for (ListenerRegistration<PeerDataEventListener> registration : peerDataEventListeners)
peer.addDataEventListener(registration.executor, registration.listener);
}
for (ListenerRegistration<OnTransactionBroadcastListener> registration : onTransactionBroadastEventListeners)
peer.addOnTransactionBroadcastListener(registration.executor, registration.listener);
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -1564,9 +1560,10 @@ protected void handlePeerDeath(final Peer peer, @Nullable Throwable exception) {
}

final int fNumConnectedPeers = numConnectedPeers;
for (ListenerRegistration<PeerDataEventListener> registration : peerDataEventListeners) {
for (ListenerRegistration<PeerDataEventListener> registration : peerDataEventListeners)
peer.removeDataEventListener(registration.listener);
}
for (ListenerRegistration<OnTransactionBroadcastListener> registration : onTransactionBroadastEventListeners)
peer.removeOnTransactionBroadcastListener(registration.listener);
for (final ListenerRegistration<PeerConnectionEventListener> registration : peerConnectionEventListeners) {
registration.executor.execute(new Runnable() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@

package org.bitcoinj.core.listeners;

import org.bitcoinj.core.Block;
import org.bitcoinj.core.FilteredBlock;
import org.bitcoinj.core.GetDataMessage;
import org.bitcoinj.core.Message;
import org.bitcoinj.core.Peer;
import org.bitcoinj.core.Transaction;
import org.bitcoinj.core.*;

import javax.annotation.*;
import java.util.List;
import java.util.*;

/**
* Deprecated: implement the more specific event listener interfaces instead to fill out only what you need
Expand All @@ -44,10 +40,6 @@ public Message onPreMessageReceived(Peer peer, Message m) {
return m;
}

@Override
public void onTransaction(Peer peer, Transaction t) {
}

@Override
public List<Message> getData(Peer peer, GetDataMessage m) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Deprecated: implement the more specific event listener interfaces instead to fill out only what you need
*/
@Deprecated
public abstract class AbstractPeerEventListener extends AbstractPeerDataEventListener implements PeerConnectionEventListener {
public abstract class AbstractPeerEventListener extends AbstractPeerDataEventListener implements PeerConnectionEventListener, OnTransactionBroadcastListener {
@Override
public void onBlocksDownloaded(Peer peer, Block block, @Nullable FilteredBlock filteredBlock, int blocksLeft) {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.bitcoinj.core.listeners;

import org.bitcoinj.core.*;

/**
* Called when a new transaction is broadcast over the network.
*/
public interface OnTransactionBroadcastListener {
/**
* Called when a new transaction is broadcast over the network.
*/
void onTransaction(Peer peer, Transaction t);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@

package org.bitcoinj.core.listeners;

import org.bitcoinj.core.Block;
import org.bitcoinj.core.FilteredBlock;
import org.bitcoinj.core.GetDataMessage;
import org.bitcoinj.core.Message;
import org.bitcoinj.core.Peer;
import org.bitcoinj.core.Transaction;
import javax.annotation.Nullable;
import java.util.List;
import org.bitcoinj.core.*;

import javax.annotation.*;
import java.util.*;

/**
* <p>Implementors can listen to events like blocks being downloaded/transactions being broadcast/connect/disconnects,
Expand Down Expand Up @@ -65,11 +61,6 @@ public interface PeerDataEventListener {
*/
Message onPreMessageReceived(Peer peer, Message m);

/**
* Called when a new transaction is broadcast over the network.
*/
void onTransaction(Peer peer, Transaction t);

/**
* <p>Called when a peer receives a getdata message, usually in response to an "inv" being broadcast. Return as many
* items as possible which appear in the {@link GetDataMessage}, or null if you're not interested in responding.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package org.bitcoinj.jni;

import org.bitcoinj.core.listeners.PeerConnectionEventListener;
import org.bitcoinj.core.listeners.PeerDataEventListener;
import org.bitcoinj.core.listeners.*;
import org.bitcoinj.core.*;

import javax.annotation.*;
Expand All @@ -29,7 +28,7 @@
* this class using JNI on the native side, thus several instances of this can point to different actual
* native implementations.
*/
public class NativePeerEventListener implements PeerConnectionEventListener, PeerDataEventListener {
public class NativePeerEventListener implements PeerConnectionEventListener, PeerDataEventListener, OnTransactionBroadcastListener {
public long ptr;

@Override
Expand Down

0 comments on commit 7014810

Please sign in to comment.