Skip to content

Commit

Permalink
Merge pull request #818 from aionnetwork/sync-state-3
Browse files Browse the repository at this point in the history
Sync State 3: Handler for trie data response messages
  • Loading branch information
AlexandraRoatis committed Feb 14, 2019
2 parents 31ffecf + 900241c commit 6cbf05f
Show file tree
Hide file tree
Showing 4 changed files with 354 additions and 0 deletions.
105 changes: 105 additions & 0 deletions modAionImpl/src/org/aion/zero/impl/sync/TrieNodeWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package org.aion.zero.impl.sync;

import java.util.Map;
import java.util.Objects;
import org.aion.base.util.ByteArrayWrapper;
import org.aion.zero.impl.sync.msg.ResponseTrieData;

/**
* Container for received trie node requests.
*
* @author Alexandra Roatis
*/
public final class TrieNodeWrapper {

private final int peerId;
private final String displayId;
private final ResponseTrieData data;

/**
* Constructor.
*
* @param peerId the hash id of the peer who sent the response
* @param displayId the display id of the peer who sent the response
* @param data the response received from the peer containing the trie node data
*/
public TrieNodeWrapper(final int peerId, final String displayId, final ResponseTrieData data) {
this.peerId = peerId;
this.displayId = displayId;
this.data = data;
}

/**
* Returns the hash id of the peer who sent the response.
*
* @return the hash id of the peer who sent the response.
*/
public int getPeerId() {
return peerId;
}

/**
* Returns the display id of the peer who sent the response.
*
* @return the display id of the peer who sent the response.
*/
public String getDisplayId() {
return displayId;
}

/**
* Returns the key of the requested trie node.
*
* @return the key of the requested trie node.
*/
public ByteArrayWrapper getNodeKey() {
return data.getNodeKey();
}

/**
* Returns the value stored for the requested trie node.
*
* @return the value stored for the requested trie node.
*/
public byte[] getNodeValue() {
return data.getNodeValue();
}

/**
* Returns the key-value pairs for the requested trie node.
*
* @return the key-value pairs for the requested trie node.
* @implNote The map is not immutable. One should avoid wrapping the same object multiple times.
*/
public Map<ByteArrayWrapper, byte[]> getReferencedNodes() {
return data.getReferencedNodes();
}

/**
* Returns the blockchain database in which the requested key was found.
*
* @return the blockchain database in which the requested key was found.
*/
public DatabaseType getDbType() {
return data.getDbType();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TrieNodeWrapper that = (TrieNodeWrapper) o;
return peerId == that.peerId
&& Objects.equals(displayId, that.displayId)
&& Objects.equals(data, that.data);
}

@Override
public int hashCode() {
return Objects.hash(peerId, displayId, data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.aion.zero.impl.sync.handler;

import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import org.aion.p2p.Ctrl;
import org.aion.p2p.Handler;
import org.aion.p2p.Ver;
import org.aion.zero.impl.sync.Act;
import org.aion.zero.impl.sync.TrieNodeWrapper;
import org.aion.zero.impl.sync.msg.ResponseTrieData;
import org.slf4j.Logger;

/**
* Handler for trie node responses from the network.
*
* @author Alexandra Roatis
*/
public final class ResponseTrieDataHandler extends Handler {

private final Logger log;

private final BlockingQueue<TrieNodeWrapper> states;

/**
* Constructor.
*
* @param log logger for reporting execution information
* @param states map containing the received states to be processed
*/
public ResponseTrieDataHandler(final Logger log, final BlockingQueue<TrieNodeWrapper> states) {
super(Ver.V1, Ctrl.SYNC, Act.RESPONSE_TRIE_DATA);
this.log = log;
this.states = states;
}

@Override
public void receive(int peerId, String displayId, final byte[] message) {
if (message == null || message.length == 0) {
this.log.debug("<res-trie empty message from peer={}>", displayId);
return;
}

ResponseTrieData response = ResponseTrieData.decode(message);

if (response != null) {
if (log.isDebugEnabled()) {
this.log.debug("<res-trie response={} peer={}>", response, displayId);
}

states.add(new TrieNodeWrapper(peerId, displayId, response));
} else {
this.log.error("<res-trie decode-error msg-bytes={} peer={}>", message.length, peerId);

if (log.isTraceEnabled()) {
this.log.trace(
"<res-trie decode-error for msg={} peer={}>",
Arrays.toString(message),
peerId);
}
}
}
}
40 changes: 40 additions & 0 deletions modAionImpl/test/org/aion/zero/impl/sync/TrieNodeWrapperTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.aion.zero.impl.sync;

import static com.google.common.truth.Truth.assertThat;
import static org.aion.zero.impl.sync.DatabaseType.STATE;
import static org.aion.zero.impl.sync.msg.RequestTrieDataTest.nodeKey;
import static org.aion.zero.impl.sync.msg.ResponseTrieDataTest.leafValue;

import org.aion.rlp.RLP;
import org.aion.zero.impl.sync.msg.ResponseTrieData;
import org.junit.Test;

/**
* Unit tests for {@link TrieNodeWrapper}.
*
* @author Alexandra Roatis
*/
public class TrieNodeWrapperTest {
private static final int peerId = Integer.MAX_VALUE;
private static final String displayId = "abcdef";

@Test
public void testConstructor() {
byte[] encoding =
RLP.encodeList(
RLP.encodeElement(nodeKey),
RLP.encodeElement(leafValue),
RLP.encodeList(new byte[0]),
RLP.encodeString(STATE.toString()));
ResponseTrieData response = ResponseTrieData.decode(encoding);

TrieNodeWrapper node = new TrieNodeWrapper(peerId, displayId, response);

assertThat(node.getPeerId()).isEqualTo(peerId);
assertThat(node.getDisplayId()).isEqualTo(displayId);
assertThat(node.getNodeKey()).isEqualTo(response.getNodeKey());
assertThat(node.getNodeValue()).isEqualTo(response.getNodeValue());
assertThat(node.getReferencedNodes()).isEqualTo(response.getReferencedNodes());
assertThat(node.getDbType()).isEqualTo(response.getDbType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package org.aion.zero.impl.sync.handler;

import static com.google.common.truth.Truth.assertThat;
import static org.aion.zero.impl.sync.DatabaseType.STATE;
import static org.aion.zero.impl.sync.msg.RequestTrieDataTest.altNodeKey;
import static org.aion.zero.impl.sync.msg.RequestTrieDataTest.nodeKey;
import static org.aion.zero.impl.sync.msg.ResponseTrieDataTest.leafValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.aion.rlp.RLP;
import org.aion.zero.impl.sync.TrieNodeWrapper;
import org.aion.zero.impl.sync.msg.ResponseTrieData;
import org.junit.Test;
import org.slf4j.Logger;

/**
* Unit tests for {@link ResponseTrieDataHandler}.
*
* @author Alexandra Roatis
*/
public class ResponseTrieDataHandlerTest {
private static final int peerId = Integer.MAX_VALUE;
private static final String displayId = "abcdef";

@Test
public void testReceive_nullMessage() {
Logger log = mock(Logger.class);
BlockingQueue<TrieNodeWrapper> receivedQueue = mock(LinkedBlockingQueue.class);

ResponseTrieDataHandler handler = new ResponseTrieDataHandler(log, receivedQueue);

// receive null message
handler.receive(peerId, displayId, null);

verify(log, times(1)).debug("<res-trie empty message from peer={}>", displayId);
verifyZeroInteractions(receivedQueue);
}

@Test
public void testReceive_emptyMessage() {
Logger log = mock(Logger.class);
BlockingQueue<TrieNodeWrapper> receivedQueue = mock(LinkedBlockingQueue.class);

ResponseTrieDataHandler handler = new ResponseTrieDataHandler(log, receivedQueue);

// receive empty message
handler.receive(peerId, displayId, new byte[0]);

verify(log, times(1)).debug("<res-trie empty message from peer={}>", displayId);
verifyZeroInteractions(receivedQueue);
}

@Test
public void testReceive_incorrectMessage() {
Logger log = mock(Logger.class);
when(log.isTraceEnabled()).thenReturn(false);

BlockingQueue<TrieNodeWrapper> receivedQueue = mock(LinkedBlockingQueue.class);

ResponseTrieDataHandler handler = new ResponseTrieDataHandler(log, receivedQueue);

// receive incorrect message
byte[] outOfOderEncoding =
RLP.encodeList(
RLP.encodeList(
RLP.encodeList(
RLP.encodeElement(nodeKey), RLP.encodeElement(leafValue)),
RLP.encodeList(
RLP.encodeElement(altNodeKey),
RLP.encodeElement(leafValue))),
RLP.encodeElement(nodeKey),
RLP.encodeElement(leafValue),
RLP.encodeString(STATE.toString()));
handler.receive(peerId, displayId, outOfOderEncoding);

verify(log, times(1))
.error(
"<res-trie decode-error msg-bytes={} peer={}>",
outOfOderEncoding.length,
peerId);
verifyZeroInteractions(receivedQueue);
}

@Test
public void testReceive_incorrectMessage_withTrace() {
Logger log = mock(Logger.class);
when(log.isTraceEnabled()).thenReturn(true);

BlockingQueue<TrieNodeWrapper> receivedQueue = mock(LinkedBlockingQueue.class);

ResponseTrieDataHandler handler = new ResponseTrieDataHandler(log, receivedQueue);

// receive incorrect message
byte[] outOfOderEncoding =
RLP.encodeList(
RLP.encodeList(
RLP.encodeList(
RLP.encodeElement(nodeKey), RLP.encodeElement(leafValue)),
RLP.encodeList(
RLP.encodeElement(altNodeKey),
RLP.encodeElement(leafValue))),
RLP.encodeElement(nodeKey),
RLP.encodeElement(leafValue),
RLP.encodeString(STATE.toString()));
handler.receive(peerId, displayId, outOfOderEncoding);

verify(log, times(1))
.trace(
"<res-trie decode-error for msg={} peer={}>",
Arrays.toString(outOfOderEncoding),
peerId);
verifyZeroInteractions(receivedQueue);
}

@Test
public void testReceive_correctMessage() {
Logger log = mock(Logger.class);
when(log.isDebugEnabled()).thenReturn(true);

BlockingQueue<TrieNodeWrapper> receivedQueue = new LinkedBlockingQueue<>();

ResponseTrieDataHandler handler = new ResponseTrieDataHandler(log, receivedQueue);

// receive correct message
byte[] encoding =
RLP.encodeList(
RLP.encodeElement(nodeKey),
RLP.encodeElement(leafValue),
RLP.encodeList(new byte[0]),
RLP.encodeString(STATE.toString()));
handler.receive(peerId, displayId, encoding);

ResponseTrieData response = ResponseTrieData.decode(encoding);

verify(log, times(1)).debug("<res-trie response={} peer={}>", response, displayId);

TrieNodeWrapper node = new TrieNodeWrapper(peerId, displayId, response);
assertThat(receivedQueue).containsExactly(node);
}
}

0 comments on commit 6cbf05f

Please sign in to comment.