Skip to content

Commit

Permalink
Refactor listener interfaces.
Browse files Browse the repository at this point in the history
Refactor listener interfaces into their own package.
Split listener interfaces into smaller interfaces.
Make abstract implementations actually abstract.
Rearrange methods for adding listeners to put executor first.
  • Loading branch information
Ross Nicoll authored and schildbach committed Aug 8, 2015
1 parent bd080ac commit ecbd021
Show file tree
Hide file tree
Showing 50 changed files with 861 additions and 358 deletions.
135 changes: 107 additions & 28 deletions core/src/main/java/org/bitcoinj/core/AbstractBlockChain.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.bitcoinj.core;

import org.bitcoinj.core.listeners.NewBestBlockListener;
import org.bitcoinj.core.listeners.ReorganizeListener;
import org.bitcoinj.core.listeners.TransactionReceivedInBlockListener;
import org.bitcoinj.store.BlockStore;
import org.bitcoinj.store.BlockStoreException;
import org.bitcoinj.utils.ListenerRegistration;
import org.bitcoinj.utils.Threading;
import org.bitcoinj.utils.VersionTally;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -35,13 +39,12 @@
import java.util.concurrent.locks.ReentrantLock;

import static com.google.common.base.Preconditions.*;
import org.bitcoinj.utils.VersionTally;

/**
* <p>An AbstractBlockChain holds a series of {@link Block} objects, links them together, and knows how to verify that
* the chain follows the rules of the {@link NetworkParameters} for this chain.</p>
*
* <p>It can be connected to a {@link Wallet}, and also {@link BlockChainListener}s that can receive transactions and
* <p>It can be connected to a {@link Wallet}, and also {@link TransactionReceivedInBlockListener}s that can receive transactions and
* notifications of re-organizations.</p>
*
* <p>An AbstractBlockChain implementation must be connected to a {@link BlockStore} implementation. The chain object
Expand Down Expand Up @@ -105,7 +108,9 @@ public abstract class AbstractBlockChain {
private final Object chainHeadLock = new Object();

protected final NetworkParameters params;
private final CopyOnWriteArrayList<ListenerRegistration<BlockChainListener>> listeners;
private final CopyOnWriteArrayList<ListenerRegistration<NewBestBlockListener>> newBestBlockListeners;
private final CopyOnWriteArrayList<ListenerRegistration<ReorganizeListener>> reorganizeListeners;
private final CopyOnWriteArrayList<ListenerRegistration<TransactionReceivedInBlockListener>> transactionReceivedListeners;

// Holds a block header and, optionally, a list of tx hashes or block's transactions
class OrphanBlock {
Expand Down Expand Up @@ -140,22 +145,28 @@ class OrphanBlock {
private final VersionTally versionTally;

/** See {@link #AbstractBlockChain(Context, List, BlockStore)} */
public AbstractBlockChain(NetworkParameters params, List<BlockChainListener> listeners,
public AbstractBlockChain(NetworkParameters params, List<? extends Wallet> transactionReceivedListeners,
BlockStore blockStore) throws BlockStoreException {
this(Context.getOrCreate(params), listeners, blockStore);
this(Context.getOrCreate(params), transactionReceivedListeners, blockStore);
}

/**
* Constructs a BlockChain connected to the given list of listeners (eg, wallets) and a store.
*/
public AbstractBlockChain(Context context, List<BlockChainListener> listeners,
public AbstractBlockChain(Context context, List<? extends Wallet> wallets,
BlockStore blockStore) throws BlockStoreException {
this.blockStore = blockStore;
chainHead = blockStore.getChainHead();
log.info("chain head is at height {}:\n{}", chainHead.getHeight(), chainHead.getHeader());
this.params = context.getParams();
this.listeners = new CopyOnWriteArrayList<ListenerRegistration<BlockChainListener>>();
for (BlockChainListener l : listeners) addListener(l, Threading.SAME_THREAD);

this.newBestBlockListeners = new CopyOnWriteArrayList<ListenerRegistration<NewBestBlockListener>>();
this.reorganizeListeners = new CopyOnWriteArrayList<ListenerRegistration<ReorganizeListener>>();
this.transactionReceivedListeners = new CopyOnWriteArrayList<ListenerRegistration<TransactionReceivedInBlockListener>>();
for (NewBestBlockListener l : wallets) addNewBestBlockListener(Threading.SAME_THREAD, l);
for (ReorganizeListener l : wallets) addReorganizeListener(Threading.SAME_THREAD, l);
for (TransactionReceivedInBlockListener l : wallets) addTransactionReceivedListener(Threading.SAME_THREAD, l);

this.versionTally = new VersionTally(context.getParams());
this.versionTally.initialize(blockStore, chainHead);
}
Expand All @@ -167,7 +178,9 @@ public AbstractBlockChain(Context context, List<BlockChainListener> listeners,
* wallets is not well tested!
*/
public void addWallet(Wallet wallet) {
addListener(wallet, Threading.SAME_THREAD);
addNewBestBlockListener(Threading.SAME_THREAD, wallet);
addReorganizeListener(Threading.SAME_THREAD, wallet);
addTransactionReceivedListener(Threading.SAME_THREAD, wallet);
int walletHeight = wallet.getLastBlockSeenHeight();
int chainHeight = getBestChainHeight();
if (walletHeight != chainHeight) {
Expand All @@ -190,28 +203,72 @@ public void addWallet(Wallet wallet) {

/** Removes a wallet from the chain. */
public void removeWallet(Wallet wallet) {
removeListener(wallet);
removeNewBestBlockListener(wallet);
removeReorganizeListener(wallet);
removeTransactionReceivedListener(wallet);
}

/**
* Adds a generic {@link NewBestBlockListener} listener to the chain.
*/
public final void addNewBestBlockListener(final NewBestBlockListener listener) {
addNewBestBlockListener(Threading.USER_THREAD, listener);
}

/**
* Adds a generic {@link BlockChainListener} listener to the chain.
* Adds a generic {@link TransactionReceivedInBlockListener} listener to the chain.
*/
public void addListener(BlockChainListener listener) {
addListener(listener, Threading.USER_THREAD);
public final void addNewBestBlockListener(Executor executor, final NewBestBlockListener listener) {
newBestBlockListeners.add(new ListenerRegistration<NewBestBlockListener>(executor, listener));
}

/**
* Adds a generic {@link BlockChainListener} listener to the chain.
* Adds a generic {@link NewBestBlockListener} listener to the chain.
*/
public void addListener(BlockChainListener listener, Executor executor) {
listeners.add(new ListenerRegistration<BlockChainListener>(listener, executor));
public final void addReorganizeListener(final ReorganizeListener listener) {
addReorganizeListener(Threading.USER_THREAD, listener);
}

/**
* Removes the given {@link BlockChainListener} from the chain.
* Adds a generic {@link TransactionReceivedInBlockListener} listener to the chain.
*/
public void removeListener(BlockChainListener listener) {
ListenerRegistration.removeFromList(listener, listeners);
public final void addReorganizeListener(Executor executor, final ReorganizeListener listener) {
reorganizeListeners.add(new ListenerRegistration<ReorganizeListener>(executor, listener));
}

/**
* Adds a generic {@link TransactionReceivedInBlockListener} listener to the chain.
*/
public final void addTransactionReceivedListener(TransactionReceivedInBlockListener listener) {
addTransactionReceivedListener(Threading.USER_THREAD, listener);
}

/**
* Adds a generic {@link TransactionReceivedInBlockListener} listener to the chain.
*/
public final void addTransactionReceivedListener(Executor executor, TransactionReceivedInBlockListener listener) {
transactionReceivedListeners.add(new ListenerRegistration<TransactionReceivedInBlockListener>(executor, listener));
}

/**
* Removes the given {@link NewBestBlockListener} from the chain.
*/
public void removeNewBestBlockListener(NewBestBlockListener listener) {
ListenerRegistration.removeFromList(listener, newBestBlockListeners);
}

/**
* Removes the given {@link ReorganizeListener} from the chain.
*/
public void removeReorganizeListener(ReorganizeListener listener) {
ListenerRegistration.removeFromList(listener, reorganizeListeners);
}

/**
* Removes the given {@link TransactionReceivedInBlockListener} from the chain.
*/
public void removeTransactionReceivedListener(TransactionReceivedInBlockListener listener) {
ListenerRegistration.removeFromList(listener, transactionReceivedListeners);
}

/**
Expand Down Expand Up @@ -542,12 +599,11 @@ private void informListenersForNewBlock(final Block block, final NewBlockType ne
boolean first = true;
Set<Sha256Hash> falsePositives = Sets.newHashSet();
if (filteredTxHashList != null) falsePositives.addAll(filteredTxHashList);
for (final ListenerRegistration<BlockChainListener> registration : listeners) {

for (final ListenerRegistration<TransactionReceivedInBlockListener> registration : transactionReceivedListeners) {
if (registration.executor == Threading.SAME_THREAD) {
informListenerForNewTransactions(block, newBlockType, filteredTxHashList, filteredTxn,
newStoredBlock, first, registration.listener, falsePositives);
if (newBlockType == NewBlockType.BEST_CHAIN)
registration.listener.notifyNewBestBlock(newStoredBlock);
} else {
// Listener wants to be run on some other thread, so marshal it across here.
final boolean notFirst = !first;
Expand All @@ -559,6 +615,29 @@ public void run() {
Set<Sha256Hash> ignoredFalsePositives = Sets.newHashSet();
informListenerForNewTransactions(block, newBlockType, filteredTxHashList, filteredTxn,
newStoredBlock, notFirst, registration.listener, ignoredFalsePositives);
} catch (VerificationException e) {
log.error("Block chain listener threw exception: ", e);
// Don't attempt to relay this back to the original peer thread if this was an async
// listener invocation.
// TODO: Make exception reporting a global feature and use it here.
}
}
});
}
first = false;
}

for (final ListenerRegistration<NewBestBlockListener> registration : newBestBlockListeners) {
if (registration.executor == Threading.SAME_THREAD) {
if (newBlockType == NewBlockType.BEST_CHAIN)
registration.listener.notifyNewBestBlock(newStoredBlock);
} else {
// Listener wants to be run on some other thread, so marshal it across here.
final boolean notFirst = !first;
registration.executor.execute(new Runnable() {
@Override
public void run() {
try {
if (newBlockType == NewBlockType.BEST_CHAIN)
registration.listener.notifyNewBestBlock(newStoredBlock);
} catch (VerificationException e) {
Expand All @@ -580,7 +659,7 @@ private static void informListenerForNewTransactions(Block block, NewBlockType n
@Nullable List<Sha256Hash> filteredTxHashList,
@Nullable Map<Sha256Hash, Transaction> filteredTxn,
StoredBlock newStoredBlock, boolean first,
BlockChainListener listener,
TransactionReceivedInBlockListener listener,
Set<Sha256Hash> falsePositives) throws VerificationException {
if (block.transactions != null) {
// If this is not the first wallet, ask for the transactions to be duplicated before being given
Expand Down Expand Up @@ -691,7 +770,7 @@ private void handleNewBestChain(StoredBlock storedPrev, StoredBlock newChainHead
// Now inform the listeners. This is necessary so the set of currently active transactions (that we can spend)
// can be updated to take into account the re-organize. We might also have received new coins we didn't have
// before and our previous spends might have been undone.
for (final ListenerRegistration<BlockChainListener> registration : listeners) {
for (final ListenerRegistration<ReorganizeListener> registration : reorganizeListeners) {
if (registration.executor == Threading.SAME_THREAD) {
// Short circuit the executor so we can propagate any exceptions.
// TODO: Do we really need to do this or should it be irrelevant?
Expand Down Expand Up @@ -768,7 +847,7 @@ public enum NewBlockType {
}

private static void sendTransactionsToListener(StoredBlock block, NewBlockType blockType,
BlockChainListener listener,
TransactionReceivedInBlockListener listener,
int relativityOffset,
List<Transaction> transactions,
boolean clone,
Expand Down Expand Up @@ -900,15 +979,15 @@ public Date estimateBlockTime(int height) {
*/
public ListenableFuture<StoredBlock> getHeightFuture(final int height) {
final SettableFuture<StoredBlock> result = SettableFuture.create();
addListener(new AbstractBlockChainListener() {
addNewBestBlockListener(Threading.SAME_THREAD, new NewBestBlockListener() {
@Override
public void notifyNewBestBlock(StoredBlock block) throws VerificationException {
if (block.getHeight() >= height) {
removeListener(this);
removeNewBestBlockListener(this);
result.set(block);
}
}
}, Threading.SAME_THREAD);
});
return result;
}

Expand Down

This file was deleted.

10 changes: 5 additions & 5 deletions core/src/main/java/org/bitcoinj/core/BlockChain.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class BlockChain extends AbstractBlockChain {
* disk serialization (this is rare).</p>
*/
public BlockChain(Context context, Wallet wallet, BlockStore blockStore) throws BlockStoreException {
this(context, new ArrayList<BlockChainListener>(), blockStore);
this(context, new ArrayList<Wallet>(), blockStore);
addWallet(wallet);
}

Expand All @@ -60,24 +60,24 @@ public BlockChain(NetworkParameters params, Wallet wallet, BlockStore blockStore
* and receiving coins but rather, just want to explore the network data structures.
*/
public BlockChain(Context context, BlockStore blockStore) throws BlockStoreException {
this(context, new ArrayList<BlockChainListener>(), blockStore);
this(context, new ArrayList<Wallet>(), blockStore);
}

/** See {@link #BlockChain(Context, BlockStore)} */
public BlockChain(NetworkParameters params, BlockStore blockStore) throws BlockStoreException {
this(params, new ArrayList<BlockChainListener>(), blockStore);
this(params, new ArrayList<Wallet>(), blockStore);
}

/**
* Constructs a BlockChain connected to the given list of listeners and a store.
*/
public BlockChain(Context params, List<BlockChainListener> wallets, BlockStore blockStore) throws BlockStoreException {
public BlockChain(Context params, List<? extends Wallet> wallets, BlockStore blockStore) throws BlockStoreException {
super(params, wallets, blockStore);
this.blockStore = blockStore;
}

/** See {@link #BlockChain(Context, List, BlockStore)} */
public BlockChain(NetworkParameters params, List<BlockChainListener> wallets, BlockStore blockStore) throws BlockStoreException {
public BlockChain(NetworkParameters params, List<? extends Wallet> wallets, BlockStore blockStore) throws BlockStoreException {
this(Context.getOrCreate(params), wallets, blockStore);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.bitcoinj.core;

import org.bitcoinj.core.listeners.TransactionReceivedInBlockListener;
import org.bitcoinj.script.Script;
import org.bitcoinj.script.Script.VerifyFlag;
import org.bitcoinj.store.BlockStoreException;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class FullPrunedBlockChain extends AbstractBlockChain {
* {@link Wallet#loadFromFile(java.io.File, WalletExtension...)}
*/
public FullPrunedBlockChain(Context context, Wallet wallet, FullPrunedBlockStore blockStore) throws BlockStoreException {
this(context, new ArrayList<BlockChainListener>(), blockStore);
this(context, new ArrayList<Wallet>(), blockStore);
addWallet(wallet);
}

Expand All @@ -78,7 +79,7 @@ public FullPrunedBlockChain(NetworkParameters params, Wallet wallet, FullPrunedB
* Constructs a block chain connected to the given store.
*/
public FullPrunedBlockChain(Context context, FullPrunedBlockStore blockStore) throws BlockStoreException {
this(context, new ArrayList<BlockChainListener>(), blockStore);
this(context, new ArrayList<Wallet>(), blockStore);
}

/**
Expand All @@ -91,7 +92,7 @@ public FullPrunedBlockChain(NetworkParameters params, FullPrunedBlockStore block
/**
* Constructs a block chain connected to the given list of wallets and a store.
*/
public FullPrunedBlockChain(Context context, List<BlockChainListener> listeners, FullPrunedBlockStore blockStore) throws BlockStoreException {
public FullPrunedBlockChain(Context context, List<Wallet> listeners, FullPrunedBlockStore blockStore) throws BlockStoreException {
super(context, listeners, blockStore);
this.blockStore = blockStore;
// Ignore upgrading for now
Expand All @@ -101,7 +102,7 @@ public FullPrunedBlockChain(Context context, List<BlockChainListener> listeners,
/**
* See {@link #FullPrunedBlockChain(Context, List, FullPrunedBlockStore)}
*/
public FullPrunedBlockChain(NetworkParameters params, List<BlockChainListener> listeners,
public FullPrunedBlockChain(NetworkParameters params, List<Wallet> listeners,
FullPrunedBlockStore blockStore) throws BlockStoreException {
this(Context.getOrCreate(params), listeners, blockStore);
}
Expand Down

0 comments on commit ecbd021

Please sign in to comment.