Skip to content

Commit

Permalink
Merge branch 'feature/1138-event-listener-improve' into feature/move-…
Browse files Browse the repository at this point in the history
…samples-to-module
  • Loading branch information
e.shevchenko committed Oct 9, 2018
2 parents 388d574 + f0bf19a commit daa0155
Show file tree
Hide file tree
Showing 30 changed files with 89 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -51,7 +52,7 @@ public class DbFlushManager {

List<AbstractCachedSource<byte[], ?>> writeCaches = new CopyOnWriteArrayList<>();
List<Source<byte[], ?>> sources = new CopyOnWriteArrayList<>();
Set<DbSource> dbSources ;
Set<DbSource> dbSources = new HashSet<>();
AbstractCachedSource<byte[], byte[]> stateDbCache;

long sizeThreshold;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import java.util.function.Consumer;

import static org.ethereum.publish.Subscription.to;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADED;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADDED;
import static org.ethereum.util.ByteUtil.toHexString;

/**
Expand Down Expand Up @@ -122,7 +122,7 @@ public EthereumImpl(final SystemProperties config) {

@PostConstruct
public void init() {
worldManager.subscribe(to(BLOCK_ADED, data -> gasPriceTracker.onBlock(data.getBlockSummary())));
worldManager.subscribe(to(BLOCK_ADDED, data -> gasPriceTracker.onBlock(data.getBlockSummary())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void onSendMessage(Channel channel, Message message) {
@Override
public void onBlock(BlockSummary blockSummary) {
compositeListener.onBlock(blockSummary);
publisher.publish(Events.onBlockAdded(blockSummary));
publisher.publish(Events.onBlockAdded(blockSummary, false));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import static org.ethereum.sync.BlockDownloader.MAX_IN_REQUEST;

/**
* @deprecated This component uses deprecated {@link EthereumListenerAdapter}, use {@link BlockReplayer} instead.
*
* Class capable of replaying stored blocks prior to 'going online' and
* notifying on newly imported blocks
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADED;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADDED;

/**
* Class capable of replaying stored blocks prior to 'going online' and
Expand Down Expand Up @@ -198,7 +198,7 @@ public BlockReplayer replayAsyncAt(Ethereum ethereum) {
}

BlockReplayer blockReplay = build();
ethereum.subscribe(BLOCK_ADED, blockReplay::onBlock);
ethereum.subscribe(BLOCK_ADDED, blockReplay::onBlock);
new Thread(() -> blockReplay.replay()).start();

return blockReplay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class Subscription<E extends Event<D>, D> {
private final static Logger log = LoggerFactory.getLogger("event");

/**
*
* Abstraction that helps manage subscription state
*/
public static class LifeCycle {
private final Subscription subscription;
Expand All @@ -49,7 +49,7 @@ private LifeCycle(Subscription subscription) {
}

/**
*
* Unsubscribes owner's subscription from current event flow.
*/
public void unsubscribe() {
subscription.unsubscribeAfter(data -> true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public interface Type {

Class<? extends Event<PeerHandshaked.Data>> PEER_HANDSHAKED = PeerHandshaked.class;

Class<? extends Event<BlockAdded.Data>> BLOCK_ADED = BlockAdded.class;
Class<? extends Event<BlockAdded.Data>> BLOCK_ADDED = BlockAdded.class;

Class<? extends Event<Node>> NODE_DISCOVERED = NodeDiscovered.class;

Expand Down Expand Up @@ -89,10 +89,6 @@ public static Event onBlockAdded(BlockSummary summary, boolean isBest) {
return new BlockAdded(summary, isBest);
}

public static Event onBlockAdded(BlockSummary summary) {
return onBlockAdded(summary, false);
}

public static Event onNodeDiscovered(Node node) {
return new NodeDiscovered(node);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@
*/
package org.ethereum.util.blockchain;

import org.ethereum.config.BlockchainConfig;
import org.ethereum.config.BlockchainNetConfig;
import org.ethereum.config.SystemProperties;
import org.ethereum.config.blockchain.ByzantiumConfig;
import org.ethereum.config.blockchain.DaoHFConfig;
import org.ethereum.config.blockchain.DaoNoHFConfig;
import org.ethereum.config.blockchain.FrontierConfig;
import org.ethereum.config.blockchain.HomesteadConfig;
import org.ethereum.core.*;
import org.ethereum.core.genesis.GenesisLoader;
Expand Down Expand Up @@ -61,7 +58,7 @@
import java.util.concurrent.ExecutionException;

import static org.ethereum.publish.Subscription.to;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADED;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADDED;
import static org.ethereum.util.ByteUtil.wrap;

/**
Expand Down Expand Up @@ -450,7 +447,7 @@ public BlockchainImpl getBlockchain() {
if (blockchain == null) {
blockchain = createBlockchain(genesis);
blockchain.setMinerCoinbase(coinbase);
subscribe(to(BLOCK_ADED, data -> lastSummary = data.getBlockSummary()));
subscribe(to(BLOCK_ADDED, data -> lastSummary = data.getBlockSummary()));
}
return blockchain;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import java.util.Random;

import static org.ethereum.publish.event.Events.Type.BLOCK_ADED;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADDED;
import static org.ethereum.publish.event.Events.Type.SYNC_DONE;
import static org.ethereum.validator.EthashRule.ChainType.main;
import static org.ethereum.validator.EthashRule.ChainType.reverse;
Expand Down Expand Up @@ -107,7 +107,7 @@ public EthashRule(Mode mode, ChainType chain, Publisher publisher) {
if (this.chain == main && publisher != null) {
publisher
.subscribe(SYNC_DONE, ss -> EthashRule.this.syncDone = true)
.subscribe(BLOCK_ADED, data -> {
.subscribe(BLOCK_ADDED, data -> {
if (data.isBest()) {
ethashHelper.preCache(data.getBlockSummary().getBlock().getNumber());
}
Expand Down
4 changes: 2 additions & 2 deletions ethereumj-core/src/test/java/org/ethereum/core/CloseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static org.ethereum.publish.event.Events.Type.BLOCK_ADED;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADDED;

/**
* Created by Anton Nashatyrev on 24.06.2016.
Expand All @@ -43,7 +43,7 @@ public void relaunchTest() throws InterruptedException {
Assert.assertNotNull(bestBlock);
final CountDownLatch latch = new CountDownLatch(1);
AtomicInteger counter = new AtomicInteger();
ethereum.subscribe(BLOCK_ADED, data -> {
ethereum.subscribe(BLOCK_ADDED, data -> {
if (counter.addAndGet(1) > 1100) {
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.ethereum.config.SystemProperties;
import org.ethereum.crypto.ECKey;
import org.ethereum.db.ByteArrayWrapper;
import org.ethereum.listener.EthereumListener;
import org.ethereum.publish.event.BlockAdded;
import org.ethereum.publish.event.PendingTransactionUpdated;
import org.ethereum.util.blockchain.SolidityContract;
Expand All @@ -47,7 +46,7 @@
import static org.ethereum.core.PendingTransaction.State.INCLUDED;
import static org.ethereum.core.PendingTransaction.State.NEW_PENDING;
import static org.ethereum.core.PendingTransaction.State.PENDING;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADED;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADDED;
import static org.ethereum.publish.event.Events.Type.PENDING_STATE_CHANGED;
import static org.ethereum.publish.event.Events.Type.PENDING_TRANSACTION_UPDATED;
import static org.ethereum.publish.Subscription.to;
Expand Down Expand Up @@ -123,7 +122,7 @@ public Triple<TransactionReceipt, PendingTransaction.State, Block> pollTxUpdate(
public void testSimple() throws InterruptedException {
PendingListener l = new PendingListener();
StandaloneBlockchain bc = new StandaloneBlockchain()
.subscribe(to(BLOCK_ADED, l::onBlock))
.subscribe(to(BLOCK_ADDED, l::onBlock))
.subscribe(to(PENDING_STATE_CHANGED, l::onPendingStateChanged))
.subscribe(to(PENDING_TRANSACTION_UPDATED, l::onPendingTransactionUpdate));

Expand Down Expand Up @@ -198,7 +197,7 @@ public void testSimple() throws InterruptedException {
public void testRebranch1() throws InterruptedException {
PendingListener l = new PendingListener();
StandaloneBlockchain bc = new StandaloneBlockchain()
.subscribe(to(BLOCK_ADED, l::onBlock))
.subscribe(to(BLOCK_ADDED, l::onBlock))
.subscribe(to(PENDING_STATE_CHANGED, l::onPendingStateChanged))
.subscribe(to(PENDING_TRANSACTION_UPDATED, l::onPendingTransactionUpdate));

Expand Down Expand Up @@ -267,7 +266,7 @@ public void testRebranch1() throws InterruptedException {
public void testRebranch2() throws InterruptedException {
PendingListener l = new PendingListener();
StandaloneBlockchain bc = new StandaloneBlockchain()
.subscribe(to(BLOCK_ADED, l::onBlock))
.subscribe(to(BLOCK_ADDED, l::onBlock))
.subscribe(to(PENDING_STATE_CHANGED, l::onPendingStateChanged))
.subscribe(to(PENDING_TRANSACTION_UPDATED, l::onPendingTransactionUpdate));

Expand Down Expand Up @@ -357,7 +356,7 @@ public void testRebranch2() throws InterruptedException {
public void testRebranch3() throws InterruptedException {
PendingListener l = new PendingListener();
StandaloneBlockchain bc = new StandaloneBlockchain()
.subscribe(to(BLOCK_ADED, l::onBlock))
.subscribe(to(BLOCK_ADDED, l::onBlock))
.subscribe(to(PENDING_STATE_CHANGED, l::onPendingStateChanged))
.subscribe(to(PENDING_TRANSACTION_UPDATED, l::onPendingTransactionUpdate));

Expand Down Expand Up @@ -409,7 +408,7 @@ public void testRebranch3() throws InterruptedException {
public void testOldBlockIncluded() throws InterruptedException {
PendingListener l = new PendingListener();
StandaloneBlockchain bc = new StandaloneBlockchain()
.subscribe(to(BLOCK_ADED, l::onBlock))
.subscribe(to(BLOCK_ADDED, l::onBlock))
.subscribe(to(PENDING_STATE_CHANGED, l::onPendingStateChanged))
.subscribe(to(PENDING_TRANSACTION_UPDATED, l::onPendingTransactionUpdate));

Expand Down Expand Up @@ -447,7 +446,7 @@ public void testOldBlockIncluded() throws InterruptedException {
public void testBlockOnlyIncluded() throws InterruptedException {
PendingListener l = new PendingListener();
StandaloneBlockchain bc = new StandaloneBlockchain()
.subscribe(to(BLOCK_ADED, l::onBlock))
.subscribe(to(BLOCK_ADDED, l::onBlock))
.subscribe(to(PENDING_STATE_CHANGED, l::onPendingStateChanged))
.subscribe(to(PENDING_TRANSACTION_UPDATED, l::onPendingTransactionUpdate));

Expand Down Expand Up @@ -476,7 +475,7 @@ public void testBlockOnlyIncluded() throws InterruptedException {
public void testTrackTx1() throws InterruptedException {
PendingListener l = new PendingListener();
StandaloneBlockchain bc = new StandaloneBlockchain()
.subscribe(to(BLOCK_ADED, l::onBlock))
.subscribe(to(BLOCK_ADDED, l::onBlock))
.subscribe(to(PENDING_STATE_CHANGED, l::onPendingStateChanged))
.subscribe(to(PENDING_TRANSACTION_UPDATED, l::onPendingTransactionUpdate));

Expand Down Expand Up @@ -530,7 +529,7 @@ public void testPrevBlock() throws InterruptedException {

PendingListener l = new PendingListener();
bc
.subscribe(to(BLOCK_ADED, l::onBlock))
.subscribe(to(BLOCK_ADDED, l::onBlock))
.subscribe(to(PENDING_STATE_CHANGED, l::onPendingStateChanged))
.subscribe(to(PENDING_TRANSACTION_UPDATED, l::onPendingTransactionUpdate));

Expand All @@ -547,7 +546,7 @@ public void testPrevBlock() throws InterruptedException {
public void testTrackTx2() throws InterruptedException {
PendingListener l = new PendingListener();
StandaloneBlockchain bc = new StandaloneBlockchain()
.subscribe(to(BLOCK_ADED, l::onBlock))
.subscribe(to(BLOCK_ADDED, l::onBlock))
.subscribe(to(PENDING_STATE_CHANGED, l::onPendingStateChanged))
.subscribe(to(PENDING_TRANSACTION_UPDATED, l::onPendingTransactionUpdate));

Expand Down Expand Up @@ -580,7 +579,7 @@ public void testTrackTx2() throws InterruptedException {
public void testRejected1() throws InterruptedException {
PendingListener l = new PendingListener();
StandaloneBlockchain bc = new StandaloneBlockchain()
.subscribe(to(BLOCK_ADED, l::onBlock))
.subscribe(to(BLOCK_ADDED, l::onBlock))
.subscribe(to(PENDING_STATE_CHANGED, l::onPendingStateChanged))
.subscribe(to(PENDING_TRANSACTION_UPDATED, l::onPendingTransactionUpdate));

Expand Down Expand Up @@ -629,7 +628,7 @@ public void testIncludedRejected() throws InterruptedException {
// the transaction becomes the main chain
PendingListener l = new PendingListener();
StandaloneBlockchain bc = new StandaloneBlockchain()
.subscribe(to(BLOCK_ADED, l::onBlock))
.subscribe(to(BLOCK_ADDED, l::onBlock))
.subscribe(to(PENDING_STATE_CHANGED, l::onPendingStateChanged))
.subscribe(to(PENDING_TRANSACTION_UPDATED, l::onPendingTransactionUpdate));

Expand Down Expand Up @@ -683,7 +682,7 @@ public void onPendingTransactionUpdate(TransactionReceipt txReceipt, PendingTran
};

StandaloneBlockchain bc = new StandaloneBlockchain()
.subscribe(to(BLOCK_ADED, l::onBlock))
.subscribe(to(BLOCK_ADDED, l::onBlock))
.subscribe(to(PENDING_STATE_CHANGED, l::onPendingStateChanged))
.subscribe(to(PENDING_TRANSACTION_UPDATED, l::onPendingTransactionUpdate));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.util.Vector;

import static java.lang.Thread.sleep;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADED;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADDED;
import static org.ethereum.publish.event.Events.Type.ETH_STATUS_UPDATED;
import static org.ethereum.publish.event.Events.Type.PEER_ADDED_TO_SYNC_POOL;
import static org.ethereum.publish.event.Events.Type.SYNC_DONE;
Expand Down Expand Up @@ -110,7 +110,7 @@ private void springInit() {
// adding the main EthereumJ callback to be notified on different kind of events
this.ethereum
.subscribe(SYNC_DONE, this::onSyncDone)
.subscribe(BLOCK_ADED, this::onBlock)
.subscribe(BLOCK_ADDED, this::onBlock)
.subscribe(ETH_STATUS_UPDATED, this::onEthStatusUpdated)
.subscribe(PEER_ADDED_TO_SYNC_POOL, this::onPeerAddedToSyncPool);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import static java.lang.Thread.sleep;
import static org.ethereum.core.PendingTransaction.State.NEW_PENDING;
import static org.ethereum.publish.Subscription.to;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADED;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADDED;
import static org.ethereum.publish.event.Events.Type.PENDING_TRANSACTION_UPDATED;

/**
Expand Down Expand Up @@ -228,7 +228,7 @@ public RegularNode() {
public void run() {
try {
this.ethereum
.subscribe(to(BLOCK_ADED, this::onBlock))
.subscribe(to(BLOCK_ADDED, this::onBlock))
.subscribe(to(PENDING_TRANSACTION_UPDATED, this::onPendingTransactionUpdated)
.conditionally(data -> data.getState() == NEW_PENDING));

Expand Down
6 changes: 3 additions & 3 deletions ethereumj-core/src/test/java/org/ethereum/mine/MinerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@

import static org.ethereum.core.PendingTransaction.State.NEW_PENDING;
import static org.ethereum.publish.Subscription.to;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADED;
import static org.ethereum.publish.event.Events.Type.BLOCK_ADDED;
import static org.ethereum.publish.event.Events.Type.PENDING_TRANSACTION_UPDATED;
import static org.ethereum.publish.event.Events.Type.SYNC_DONE;

Expand Down Expand Up @@ -108,7 +108,7 @@ public void startMiningConsumer() throws Exception {
Ethereum ethereum2 = EthereumFactory.createEthereum(SysPropConfig2.props, SysPropConfig2.class);

final CountDownLatch semaphore = new CountDownLatch(1);
ethereum2.subscribe(BLOCK_ADED, data -> {
ethereum2.subscribe(BLOCK_ADDED, data -> {
Block block = data.getBlockSummary().getBlock();
System.err.println("=== New block: " + blockInfo(block));
System.err.println(block);
Expand Down Expand Up @@ -207,7 +207,7 @@ public void startMiningTest() throws FileNotFoundException, InterruptedException

final CountDownLatch semaphore = new CountDownLatch(1);
ethereum1
.subscribe(BLOCK_ADED, data -> System.out.println("=== New block: " + blockInfo(data.getBlockSummary().getBlock())))
.subscribe(BLOCK_ADDED, data -> System.out.println("=== New block: " + blockInfo(data.getBlockSummary().getBlock())))
.subscribe(SYNC_DONE, syncState -> semaphore.countDown());

// ethereum2.addListener(new EthereumListenerAdapter() {
Expand Down

0 comments on commit daa0155

Please sign in to comment.