Skip to content

Commit

Permalink
Refactor bloom filter calculations out of PeerGroup into a separate F…
Browse files Browse the repository at this point in the history
…ilterMerger class.
  • Loading branch information
mikehearn committed May 7, 2014
1 parent 26823d1 commit 46ad86a
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 57 deletions.
83 changes: 26 additions & 57 deletions core/src/main/java/com/google/bitcoin/core/PeerGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.bitcoin.net.BlockingClientManager;
import com.google.bitcoin.net.ClientConnectionManager;
import com.google.bitcoin.net.FilterMerger;
import com.google.bitcoin.net.NioClientManager;
import com.google.bitcoin.net.discovery.PeerDiscovery;
import com.google.bitcoin.net.discovery.PeerDiscoveryException;
Expand All @@ -29,6 +30,7 @@
import com.google.bitcoin.utils.Threading;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
Expand Down Expand Up @@ -138,7 +140,7 @@ public List<Message> getData(Peer peer, GetDataMessage m) {
@Override
public void onBlocksDownloaded(Peer peer, Block block, int blocksLeft) {
double rate = checkNotNull(chain).getFalsePositiveRate();
if (rate > bloomFilterFPRate * MAX_FP_RATE_INCREASE) {
if (rate > bloomFilterMerger.getBloomFilterFPRate() * MAX_FP_RATE_INCREASE) {
log.info("Force update Bloom filter due to high false positive rate");
recalculateFastCatchupAndFilter(FilterRecalculateMode.FORCE_SEND);
}
Expand Down Expand Up @@ -240,8 +242,6 @@ public void onPeerDisconnected(Peer peer, int peerCount) {
// Visible for testing
PeerEventListener startupListener = new PeerStartupListener();

// A bloom filter generated from all connected wallets that is given to new peers
private BloomFilter bloomFilter;
/**
* <p>A reasonable default for the bloom filter false positive rate on mainnet. FP rates are values between 0.0 and 1.0
* where 1.0 is "all transactions" i.e. 100%.</p>
Expand All @@ -252,11 +252,9 @@ public void onPeerDisconnected(Peer peer, int peerCount) {
public static final double DEFAULT_BLOOM_FILTER_FP_RATE = 0.0005;
/** Maximum increase in FP rate before forced refresh of the bloom filter */
public static final double MAX_FP_RATE_INCREASE = 2.0f;
// The false positive rate for bloomFilter
private double bloomFilterFPRate = DEFAULT_BLOOM_FILTER_FP_RATE;
// We use a constant tweak to avoid giving up privacy when we regenerate our filter with new keys
private final long bloomFilterTweak = (long) (Math.random() * Long.MAX_VALUE);
private int lastBloomFilterElementCount;
// An object that calculates bloom filters given a list of filter providers, whilst tracking some state useful
// for privacy purposes.
private FilterMerger bloomFilterMerger;

/** The default timeout between when a connection attempt begins and version message exchange completes */
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 5000;
Expand Down Expand Up @@ -357,6 +355,7 @@ public int compare(PeerAddress a, PeerAddress b) {
peerDiscoverers = new CopyOnWriteArraySet<PeerDiscovery>();
peerEventListeners = new CopyOnWriteArrayList<ListenerRegistration<PeerEventListener>>();
runningBroadcasts = Collections.synchronizedSet(new HashSet<TransactionBroadcast>());
bloomFilterMerger = new FilterMerger(DEFAULT_BLOOM_FILTER_FP_RATE);
}

/**
Expand Down Expand Up @@ -864,54 +863,24 @@ public void recalculateFastCatchupAndFilter(FilterRecalculateMode mode) {
if (chain != null && chain.shouldVerifyTransactions())
return;
log.info("Recalculating filter in mode {}", mode);
long earliestKeyTimeSecs = Long.MAX_VALUE;
int elements = 0;
boolean requiresUpdateAll = false;
for (PeerFilterProvider p : peerFilterProviders) {
earliestKeyTimeSecs = Math.min(earliestKeyTimeSecs, p.getEarliestKeyCreationTime());
elements += p.getBloomFilterElementCount();
requiresUpdateAll = requiresUpdateAll || p.isRequiringUpdateAllBloomFilter();
FilterMerger.Result result = bloomFilterMerger.calculate(ImmutableList.copyOf(peerFilterProviders));
boolean send;
switch (mode) {
case SEND_IF_CHANGED: send = result.changed; break;
case DONT_SEND: send = false; break;
case FORCE_SEND: send = true; break;
default: throw new UnsupportedOperationException();
}

if (elements > 0) {
// We stair-step our element count so that we avoid creating a filter with different parameters
// as much as possible as that results in a loss of privacy.
// The constant 100 here is somewhat arbitrary, but makes sense for small to medium wallets -
// it will likely mean we never need to create a filter with different parameters.
lastBloomFilterElementCount = elements > lastBloomFilterElementCount ? elements + 100 : lastBloomFilterElementCount;
BloomFilter.BloomUpdate bloomFlags =
requiresUpdateAll ? BloomFilter.BloomUpdate.UPDATE_ALL : BloomFilter.BloomUpdate.UPDATE_P2PUBKEY_ONLY;
BloomFilter filter = new BloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak, bloomFlags);
for (PeerFilterProvider p : peerFilterProviders)
filter.merge(p.getBloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak));

boolean changed = !filter.equals(bloomFilter);
boolean send = false;

bloomFilter = filter;

switch (mode) {
case SEND_IF_CHANGED: send = changed; break;
case DONT_SEND: send = false; break;
case FORCE_SEND: send = true; break;
}

if (send) {
for (Peer peer : peers)
peer.setBloomFilter(filter);
// Reset the false positive estimate so that we don't send a flood of filter updates
// if the estimate temporarily overshoots our threshold.
if (chain != null)
chain.resetFalsePositiveEstimate();
}
if (send) {
for (Peer peer : peers)
peer.setBloomFilter(result.filter);
// Reset the false positive estimate so that we don't send a flood of filter updates
// if the estimate temporarily overshoots our threshold.
if (chain != null)
chain.resetFalsePositiveEstimate();
}
// Now adjust the earliest key time backwards by a week to handle the case of clock drift. This can occur
// both in block header timestamps and if the users clock was out of sync when the key was first created
// (to within a small amount of tolerance).
earliestKeyTimeSecs -= 86400 * 7;

// Do this last so that bloomFilter is already set when it gets called.
setFastCatchupTimeSecs(earliestKeyTimeSecs);
setFastCatchupTimeSecs(result.earliestKeyTimeSecs);
} finally {
lock.unlock();
}
Expand All @@ -929,7 +898,7 @@ public void recalculateFastCatchupAndFilter(FilterRecalculateMode mode) {
public void setBloomFilterFalsePositiveRate(double bloomFilterFPRate) {
lock.lock();
try {
this.bloomFilterFPRate = bloomFilterFPRate;
bloomFilterMerger.setBloomFilterFPRate(bloomFilterFPRate);
recalculateFastCatchupAndFilter(FilterRecalculateMode.SEND_IF_CHANGED);
} finally {
lock.unlock();
Expand Down Expand Up @@ -1070,7 +1039,7 @@ protected void handleNewPeer(final Peer peer) {
// Give the peer a filter that can be used to probabilistically drop transactions that
// aren't relevant to our wallet. We may still receive some false positives, which is
// OK because it helps improve wallet privacy. Old nodes will just ignore the message.
if (bloomFilter != null) peer.setBloomFilter(bloomFilter);
if (bloomFilterMerger.getLastFilter() != null) peer.setBloomFilter(bloomFilterMerger.getLastFilter());
// Link the peer to the memory pool so broadcast transactions have their confidence levels updated.
peer.setDownloadData(false);
// TODO: The peer should calculate the fast catchup time from the added wallets here.
Expand Down Expand Up @@ -1183,7 +1152,7 @@ private void setDownloadPeer(@Nullable Peer peer) {
if (downloadListener != null)
peer.addEventListener(downloadListener, Threading.SAME_THREAD);
downloadPeer.setDownloadData(true);
downloadPeer.setDownloadParameters(fastCatchupTimeSecs, bloomFilter != null);
downloadPeer.setDownloadParameters(fastCatchupTimeSecs, bloomFilterMerger.getLastFilter() != null);
}
} finally {
lock.unlock();
Expand Down Expand Up @@ -1211,7 +1180,7 @@ public void setFastCatchupTimeSecs(long secondsSinceEpoch) {
Preconditions.checkState(chain == null || !chain.shouldVerifyTransactions(), "Fast catchup is incompatible with fully verifying");
fastCatchupTimeSecs = secondsSinceEpoch;
if (downloadPeer != null) {
downloadPeer.setDownloadParameters(secondsSinceEpoch, bloomFilter != null);
downloadPeer.setDownloadParameters(secondsSinceEpoch, bloomFilterMerger.getLastFilter() != null);
}
} finally {
lock.unlock();
Expand Down
80 changes: 80 additions & 0 deletions core/src/main/java/com/google/bitcoin/net/FilterMerger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.google.bitcoin.net;

import com.google.bitcoin.core.BloomFilter;
import com.google.bitcoin.core.PeerFilterProvider;
import com.google.common.collect.ImmutableList;

// This code is unit tested by the PeerGroup tests.

/**
* <p>A reusable object that will calculate, given a list of {@link com.google.bitcoin.core.PeerFilterProvider}s, a merged
* {@link com.google.bitcoin.core.BloomFilter} and earliest key time for all of them.
* Used by the {@link com.google.bitcoin.core.PeerGroup} class internally.</p>
*
* <p>Thread safety: this class tracks the element count of the last filter it calculated and so must be synchronised
* externally or used from only one thread. It will acquire a lock on each filter in turn before performing the
* calculation because the providers may be mutated in other threads in parallel, but global consistency is required
* to produce a merged filter.</p>
*/
public class FilterMerger {
// We use a constant tweak to avoid giving up privacy when we regenerate our filter with new keys
private final long bloomFilterTweak = (long) (Math.random() * Long.MAX_VALUE);
private double bloomFilterFPRate;
private int lastBloomFilterElementCount;
private BloomFilter lastFilter;

public FilterMerger(double bloomFilterFPRate) {
this.bloomFilterFPRate = bloomFilterFPRate;
}

public static class Result {
public BloomFilter filter;
public long earliestKeyTimeSecs;
public boolean changed;
}

public Result calculate(ImmutableList<PeerFilterProvider> providers) {
Result result = new Result();
result.earliestKeyTimeSecs = Long.MAX_VALUE;
int elements = 0;
boolean requiresUpdateAll = false;
for (PeerFilterProvider p : providers) {
result.earliestKeyTimeSecs = Math.min(result.earliestKeyTimeSecs, p.getEarliestKeyCreationTime());
elements += p.getBloomFilterElementCount();
requiresUpdateAll = requiresUpdateAll || p.isRequiringUpdateAllBloomFilter();
}

if (elements > 0) {
// We stair-step our element count so that we avoid creating a filter with different parameters
// as much as possible as that results in a loss of privacy.
// The constant 100 here is somewhat arbitrary, but makes sense for small to medium wallets -
// it will likely mean we never need to create a filter with different parameters.
lastBloomFilterElementCount = elements > lastBloomFilterElementCount ? elements + 100 : lastBloomFilterElementCount;
BloomFilter.BloomUpdate bloomFlags =
requiresUpdateAll ? BloomFilter.BloomUpdate.UPDATE_ALL : BloomFilter.BloomUpdate.UPDATE_P2PUBKEY_ONLY;
BloomFilter filter = new BloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak, bloomFlags);
for (PeerFilterProvider p : providers)
filter.merge(p.getBloomFilter(lastBloomFilterElementCount, bloomFilterFPRate, bloomFilterTweak));

result.changed = !filter.equals(lastFilter);
result.filter = lastFilter = filter;
}
// Now adjust the earliest key time backwards by a week to handle the case of clock drift. This can occur
// both in block header timestamps and if the users clock was out of sync when the key was first created
// (to within a small amount of tolerance).
result.earliestKeyTimeSecs -= 86400 * 7;
return result;
}

public void setBloomFilterFPRate(double bloomFilterFPRate) {
this.bloomFilterFPRate = bloomFilterFPRate;
}

public double getBloomFilterFPRate() {
return bloomFilterFPRate;
}

public BloomFilter getLastFilter() {
return lastFilter;
}
}

0 comments on commit 46ad86a

Please sign in to comment.