Skip to content

Commit

Permalink
Merge 51ac339 into 561582b
Browse files Browse the repository at this point in the history
  • Loading branch information
mkalinin committed Oct 26, 2018
2 parents 561582b + 51ac339 commit bd8bbcb
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,6 @@ private boolean isValid(Repository repo, Block block) {
boolean isValid = true;

if (!block.isGenesis()) {
isValid = isValid(block.getHeader());

// Sanity checks
String trieHash = toHexString(block.getTxTrieRoot());
Expand Down
23 changes: 21 additions & 2 deletions ethereumj-core/src/main/java/org/ethereum/manager/BlockLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.ethereum.db.DbFlushManager;
import org.ethereum.util.ExecutorPipeline;
import org.ethereum.validator.BlockHeaderValidator;
import org.ethereum.validator.DependentBlockHeaderRule;
import org.ethereum.validator.ParentBlockHeaderValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spongycastle.util.encoders.Hex;
Expand Down Expand Up @@ -57,15 +59,18 @@ default void close() throws IOException {
private final static DateTimeFormatter df = DateTimeFormatter.ofPattern("HH:mm:ss.SSSS");

private final BlockHeaderValidator headerValidator;
private final ParentBlockHeaderValidator parentHeaderValidator;
private final Blockchain blockchain;
private final DbFlushManager dbFlushManager;

private ExecutorPipeline<Block, Block> exec1;
private ExecutorPipeline<Block, ?> exec2;

@Autowired
public BlockLoader(BlockHeaderValidator headerValidator, Blockchain blockchain, DbFlushManager dbFlushManager) {
public BlockLoader(BlockHeaderValidator headerValidator, Blockchain blockchain, DbFlushManager dbFlushManager,
ParentBlockHeaderValidator parentHeaderValidator) {
this.headerValidator = headerValidator;
this.parentHeaderValidator = parentHeaderValidator;
this.blockchain = blockchain;
this.dbFlushManager = dbFlushManager;
}
Expand Down Expand Up @@ -177,7 +182,21 @@ public boolean loadBlocks(Path... paths) {
}

private boolean isValid(BlockHeader header) {
return headerValidator.validateAndLog(header, logger);
return isParentValid(header) && headerValidator.validateAndLog(header, logger);
}

private boolean isParentValid(BlockHeader header) {
Block parent = blockchain.getBlockByHash(header.getParentHash());
if (parent == null) {
return true;
}
boolean valid = parentHeaderValidator.validate(header, parent.getHeader());

if (!valid) {
parentHeaderValidator.logErrors(logger);
}

return valid;
}

private class HexLineDumpWalker implements DumpWalker {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.ethereum.core.*;
import org.ethereum.crypto.HashUtil;
import org.ethereum.net.server.Channel;
import org.ethereum.validator.BlockHeaderValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spongycastle.util.encoders.Hex;

import java.util.*;
import java.util.concurrent.*;
Expand Down Expand Up @@ -388,20 +390,41 @@ private boolean validateAndAddHeaders(List<BlockHeader> headers, byte[] nodeId)
wrappers.add(new BlockHeaderWrapper(header, nodeId));
}

SyncQueueIfc.ValidatedHeaders res;
synchronized (this) {
List<BlockHeaderWrapper> headersReady = syncQueue.addHeaders(wrappers);
if (headersReady != null && !headersReady.isEmpty()) {
pushHeaders(headersReady);
res = syncQueue.addHeadersAndValidate(wrappers);
if (res.isValid() && res.getHeaders() != null && !res.getHeaders().isEmpty()) {
pushHeaders(res.getHeaders());
}
}

dropIfValidationFailed(res);

receivedHeadersLatch.countDown();

logger.debug("{}: {} headers added", name, headers.size());

return true;
}

/**
* Checks whether validation has been passed correctly or not
* and drops misleading peer if it hasn't
*/
protected void dropIfValidationFailed(SyncQueueIfc.ValidatedHeaders res) {
if (!res.isValid() && res.getNodeId() != null) {
if (logger.isWarnEnabled()) logger.warn("Invalid header received: {}, reason: {}, peer: {}",
res.getHeader() == null ? "" : res.getHeader().getShortDescr(),
res.getReason(),
Hex.toHexString(res.getNodeId()).substring(0, 8));

Channel peer = pool.getByNodeId(res.getNodeId());
if (peer != null) {
peer.dropConnection();
}
}
}

/**
* Runs checks against block's header. <br>
* All these checks make sense before block is added to queue
Expand Down
11 changes: 9 additions & 2 deletions ethereumj-core/src/main/java/org/ethereum/sync/SyncManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.ethereum.net.server.ChannelManager;
import org.ethereum.util.ExecutorPipeline;
import org.ethereum.validator.BlockHeaderValidator;
import org.ethereum.validator.DependentBlockHeaderRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -91,6 +92,9 @@ public void accept(BlockWrapper blockWrapper) {
@Autowired
private FastSyncManager fastSyncManager;

@Autowired
private DependentBlockHeaderRule parentHeaderValidator;

ChannelManager channelManager;

private SystemProperties config;
Expand Down Expand Up @@ -161,7 +165,8 @@ void initRegularSync(EthereumListener.SyncState syncDoneType) {
logger.info("Initializing SyncManager regular sync.");
this.syncDoneType = syncDoneType;

syncQueue = new SyncQueueImpl(blockchain);
syncQueue = new SyncQueueImpl(blockchain)
.withParentHeaderValidator(parentHeaderValidator);
super.init(syncQueue, pool, "RegularSync");

Runnable queueProducer = this::produceQueue;
Expand Down Expand Up @@ -393,7 +398,9 @@ public boolean validateAndAddNewBlock(Block block, byte[] nodeId) {
}

logger.debug("Adding new block to sync queue: " + block.getShortDescr());
syncQueue.addHeaders(singletonList(new BlockHeaderWrapper(block.getHeader(), nodeId)));
SyncQueueIfc.ValidatedHeaders res = syncQueue.addHeadersAndValidate(
singletonList(new BlockHeaderWrapper(block.getHeader(), nodeId)));
dropIfValidationFailed(res);

synchronized (this) {
List<Block> newBlocks = syncQueue.addBlocks(singletonList(block));
Expand Down
65 changes: 65 additions & 0 deletions ethereumj-core/src/main/java/org/ethereum/sync/SyncQueueIfc.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.ethereum.sync;

import org.ethereum.core.Block;
import org.ethereum.core.BlockHeader;
import org.ethereum.core.BlockHeaderWrapper;

import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;

/**
Expand Down Expand Up @@ -55,6 +58,56 @@ interface BlocksRequest {
List<BlockHeaderWrapper> getBlockHeaders();
}

/**
* Handles result of {@link #addHeadersAndValidate(Collection)} invocation.
*
* <p>
* If {@code valid} is true then validation passed successfully
* and {@code headers} list contains the same result as if {@link #addHeaders(Collection)} was called.
* Otherwise, the list contains invalid headers.
*/
class ValidatedHeaders {
public static final ValidatedHeaders Empty = new ValidatedHeaders(null, true);

private final List<BlockHeaderWrapper> headers;
private final boolean valid;
private final String reason;

public ValidatedHeaders(List<BlockHeaderWrapper> headers, boolean valid, String reason) {
this.headers = headers;
this.valid = valid;
this.reason = reason;
}

public ValidatedHeaders(List<BlockHeaderWrapper> headers, boolean valid) {
this(headers, valid, "");
}

public boolean isValid() {
return valid;
}

public List<BlockHeaderWrapper> getHeaders() {
return headers;
}

public String getReason() {
return reason;
}

@Nullable
public byte[] getNodeId() {
if (headers == null || headers.isEmpty()) return null;
return headers.get(0).getNodeId();
}

@Nullable
public BlockHeader getHeader() {
if (headers == null || headers.isEmpty()) return null;
return headers.get(0).getHeader();
}
}

/**
* Returns wanted headers requests
* @param maxSize Maximum number of headers in a singles request
Expand All @@ -76,6 +129,18 @@ interface BlocksRequest {
*/
List<BlockHeaderWrapper> addHeaders(Collection<BlockHeaderWrapper> headers);

/**
* In general, does the same work as {@link #addHeaders(Collection)} does.
* But before trimming, the longest chain is checked with parent header validator.
* If validation is failed, the chain is erased from the queue.
*
* <p>
* <b>Note:</b> in reverse queue falls to {@link #addHeaders(Collection)} invocation
*
* @return check {@link ValidatedHeaders} for details
*/
ValidatedHeaders addHeadersAndValidate(Collection<BlockHeaderWrapper> headers);

/**
* Returns wanted blocks hashes
*/
Expand Down
86 changes: 86 additions & 0 deletions ethereumj-core/src/main/java/org/ethereum/sync/SyncQueueImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.ethereum.core.Blockchain;
import org.ethereum.db.ByteArrayWrapper;
import org.ethereum.util.ByteArrayMap;
import org.ethereum.validator.DependentBlockHeaderRule;

import java.util.*;
import java.util.function.Function;
Expand Down Expand Up @@ -183,6 +184,8 @@ public List<HeaderElement> getChildren() {

Random rnd = new Random(); // ;)

DependentBlockHeaderRule parentHeaderValidator = null;

public SyncQueueImpl(List<Block> initBlocks) {
init(initBlocks);
}
Expand Down Expand Up @@ -264,6 +267,10 @@ private boolean hasGaps() {

private void trimChain() {
List<HeaderElement> longestChain = getLongestChain();
trimChainImpl(longestChain);
}

private void trimChainImpl(List<HeaderElement> longestChain) {
if (longestChain.size() > MAX_CHAIN_LEN) {
long newTrimNum = getLongestChain().get(longestChain.size() - MAX_CHAIN_LEN).header.getNumber();
for (int i = 0; darkZoneNum < newTrimNum; darkZoneNum++, i++) {
Expand Down Expand Up @@ -360,6 +367,80 @@ public synchronized List<BlockHeaderWrapper> addHeaders(Collection<BlockHeaderWr
return null;
}

@Override
public ValidatedHeaders addHeadersAndValidate(Collection<BlockHeaderWrapper> headers) {
for (BlockHeaderWrapper header : headers) {
addHeader(header);
}

List<HeaderElement> longestChain = getLongestChain();
ValidatedHeaders result = validateChain(longestChain);
if (result.isValid()) {
trimChainImpl(longestChain);
} else {
// erase chain starting from first invalid header
eraseChain(longestChain, result.getHeaders().get(0).getNumber());
}

return result;
}

/**
* Runs parent header validation and returns after first occurrence of invalid header
*/
ValidatedHeaders validateChain(List<HeaderElement> chain) {
if (chain.size() <= MAX_CHAIN_LEN || parentHeaderValidator == null)
return ValidatedHeaders.Empty;

for (int i = 1; i < chain.size(); i++) {
BlockHeaderWrapper parent = chain.get(i - 1).header;
BlockHeaderWrapper header = chain.get(i).header;
if (!parentHeaderValidator.validate(header.getHeader(), parent.getHeader())) {
return new ValidatedHeaders(Collections.singletonList(header), false,
parentHeaderValidator.getErrors().isEmpty() ? "" : parentHeaderValidator.getErrors().get(0));
}
}

return ValidatedHeaders.Empty;
}

void eraseChain(List<HeaderElement> chain, long startFrom) {
if (chain.isEmpty())
return;

// prevent from going beyond dark zone
startFrom = Math.max(darkZoneNum + 1, startFrom);

HeaderElement head = chain.get(chain.size() - 1);
for (int i = chain.size() - 1; i >= 0; i--) {
HeaderElement el = chain.get(i);
if (el.header.getNumber() < startFrom) break; // erase up to startFrom number
Map<ByteArrayWrapper, HeaderElement> gen = headers.get(el.header.getNumber());
gen.remove(new ByteArrayWrapper(el.header.getHash()));
// clean empty gens
if (gen.isEmpty()) {
headers.remove(el.header.getNumber());
}
}

// adjust maxNum
if (head.header.getNumber() == maxNum) {
Map<ByteArrayWrapper, HeaderElement> lastValidatedGen = headers.get(darkZoneNum);
assert lastValidatedGen.size() == 1;
long maxNotEmptyGen = lastValidatedGen.values().iterator().next().header.getNumber();

// find new maxNum after chain has been erased
for (long num = head.header.getNumber(); num >= darkZoneNum; num--) {
Map<ByteArrayWrapper, HeaderElement> gen = headers.get(num);
if (gen != null && !gen.isEmpty() && num > maxNotEmptyGen) {
maxNotEmptyGen = num;
break;
}
}
maxNum = maxNotEmptyGen;
}
}

@Override
public synchronized int getHeadersCount() {
return (int) (maxNum - minNum);
Expand Down Expand Up @@ -438,6 +519,11 @@ public synchronized List<Block> pollBlocks() {
return null;
}

public SyncQueueImpl withParentHeaderValidator(DependentBlockHeaderRule validator) {
this.parentHeaderValidator = validator;
return this;
}


interface Visitor<T> {
T visit(HeaderElement el, List<T> childrenRes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ public synchronized List<BlockHeaderWrapper> addHeaders(Collection<BlockHeaderWr
}
}

@Override
public ValidatedHeaders addHeadersAndValidate(Collection<BlockHeaderWrapper> headers) {
List<BlockHeaderWrapper> added = addHeaders(headers);
return new ValidatedHeaders(added, true);
}

@Override
public synchronized BlocksRequest requestBlocks(int maxSize) {
List<BlockHeaderWrapper> reqHeaders = new ArrayList<>();
Expand Down

0 comments on commit bd8bbcb

Please sign in to comment.