Skip to content
This repository has been archived by the owner on Jul 1, 2021. It is now read-only.

Commit

Permalink
Merge pull request #2109 from carver/witness-hash-union-2105
Browse files Browse the repository at this point in the history
Store union of hashes for block witness
  • Loading branch information
carver authored Nov 27, 2020
2 parents 887a6b4 + 653f264 commit 0deec1a
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 10 deletions.
76 changes: 71 additions & 5 deletions tests/core/p2p-proto/test_wit_db.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from eth.db.atomic import AtomicDB
from eth.db.backends.memory import MemoryDB

from trinity.exceptions import WitnessHashesUnavailable
from trinity.protocol.wit.db import AsyncWitnessDB
Expand All @@ -9,20 +9,22 @@

@pytest.mark.asyncio
async def test_persisting_and_looking_up():
wit_db = AsyncWitnessDB(AtomicDB())
wit_db = AsyncWitnessDB(MemoryDB())

hash1 = Hash32Factory()
with pytest.raises(WitnessHashesUnavailable):
await wit_db.coro_get_witness_hashes(hash1)

hash1_witnesses = tuple(Hash32Factory.create_batch(5))
await wit_db.coro_persist_witness_hashes(hash1, hash1_witnesses)
assert await wit_db.coro_get_witness_hashes(hash1) == hash1_witnesses
loaded_hashes = await wit_db.coro_get_witness_hashes(hash1)

assert set(loaded_hashes) == set(hash1_witnesses)


@pytest.mark.asyncio
async def test_witness_for_recent_blocks():
wit_db = AsyncWitnessDB(AtomicDB())
wit_db = AsyncWitnessDB(MemoryDB())
hash1 = Hash32Factory()
hash1_witnesses = tuple(Hash32Factory.create_batch(5))
await wit_db.coro_persist_witness_hashes(hash1, hash1_witnesses)
Expand All @@ -32,7 +34,8 @@ async def test_witness_for_recent_blocks():
await wit_db.coro_persist_witness_hashes(Hash32Factory(), Hash32Factory.create_batch(2))

# It should still be there...
assert await wit_db.coro_get_witness_hashes(hash1) == hash1_witnesses
loaded_hashes = await wit_db.coro_get_witness_hashes(hash1)
assert set(loaded_hashes) == set(hash1_witnesses)

# Until one more new witness is added.
await wit_db.coro_persist_witness_hashes(Hash32Factory(), Hash32Factory.create_batch(2))
Expand All @@ -42,3 +45,66 @@ async def test_witness_for_recent_blocks():
await wit_db.coro_get_witness_hashes(hash1)

assert len(wit_db._get_recent_blocks_with_witnesses()) == wit_db._max_witness_history


@pytest.mark.asyncio
async def test_witness_history_on_repeat_blocks():
"""
Repeated blocks should not consume more slots in the limited history of block witnesses
"""
wit_db = AsyncWitnessDB(MemoryDB())
hash1 = Hash32Factory()
hash1_witnesses = tuple(Hash32Factory.create_batch(5))
await wit_db.coro_persist_witness_hashes(hash1, hash1_witnesses)

hash2 = Hash32Factory()
await wit_db.coro_persist_witness_hashes(hash2, tuple(Hash32Factory.create_batch(5)))

# *almost* push the first witness out of history
for _ in range(wit_db._max_witness_history - 2):
await wit_db.coro_persist_witness_hashes(Hash32Factory(), Hash32Factory.create_batch(2))

# It should still be there...
loaded_hashes = await wit_db.coro_get_witness_hashes(hash1)
assert set(loaded_hashes) == set(hash1_witnesses)

# Add one more new witness, for an existing block
await wit_db.coro_persist_witness_hashes(hash2, Hash32Factory.create_batch(2))

# That new witness should *not* consume a block slot in history, so the first hash's
# witness should still be available.
loaded_hashes = await wit_db.coro_get_witness_hashes(hash1)
assert set(loaded_hashes) == set(hash1_witnesses)


@pytest.mark.asyncio
async def test_witness_eviction_on_repeat_blocks():
"""
After witnesses are persisted twice for the same block, make sure that eviction
does not cause any failures.
"""
wit_db = AsyncWitnessDB(MemoryDB())
hash_ = Hash32Factory()
await wit_db.coro_persist_witness_hashes(hash_, Hash32Factory.create_batch(2))
await wit_db.coro_persist_witness_hashes(hash_, Hash32Factory.create_batch(2))
for _ in range(wit_db._max_witness_history):
await wit_db.coro_persist_witness_hashes(Hash32Factory(), Hash32Factory.create_batch(2))


@pytest.mark.asyncio
async def test_witness_union():
wit_db = AsyncWitnessDB(MemoryDB())
hash1 = Hash32Factory()
hash1_witnesses_unique1 = set(Hash32Factory.create_batch(3))
hash1_witnesses_unique2 = set(Hash32Factory.create_batch(3))
hash1_witnesses_both = set(Hash32Factory.create_batch(2))
hash1_witnesses1 = tuple(hash1_witnesses_unique1 | hash1_witnesses_both)
hash1_witnesses2 = tuple(hash1_witnesses_unique2 | hash1_witnesses_both)

await wit_db.coro_persist_witness_hashes(hash1, hash1_witnesses1)
await wit_db.coro_persist_witness_hashes(hash1, hash1_witnesses2)

stored_hashes = await wit_db.coro_get_witness_hashes(hash1)

expected = hash1_witnesses_unique1 | hash1_witnesses_both | hash1_witnesses_unique2
assert set(stored_hashes) == expected
23 changes: 20 additions & 3 deletions trinity/protocol/wit/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,31 @@ def persist_witness_hashes(
except KeyError:
recent_blocks_with_witnesses = []

while len(recent_blocks_with_witnesses) >= self._max_witness_history:
# Add in the new block, if it's not already present
if block_hash not in recent_blocks_with_witnesses:
recent_blocks_with_witnesses.append(block_hash)

# Flush out old witnesses
while len(recent_blocks_with_witnesses) > self._max_witness_history:
oldest_block_witness = recent_blocks_with_witnesses.pop(0)
del self.db[self._make_block_witness_hashes_lookup_key(oldest_block_witness)]

recent_blocks_with_witnesses.append(block_hash)
# Store new reference to existing witness
self.db[self._recent_blocks_with_witnesses_lookup_key] = rlp.encode(
recent_blocks_with_witnesses)
self.db[self._make_block_witness_hashes_lookup_key(block_hash)] = rlp.encode(witness_hashes)

try:
# Note: if this call is converted to async, watch out for the race
# condition of two persists that interleave. It would be
# possible for one to overwrite the other. For now, the synchronous
# approach means that that isn't a concern.
existing_hashes = self.get_witness_hashes(block_hash)
except WitnessHashesUnavailable:
existing_hashes = ()

block_hashes_key = self._make_block_witness_hashes_lookup_key(block_hash)
combined_hashes = tuple(set(existing_hashes).union(witness_hashes))
self.db[block_hashes_key] = rlp.encode(combined_hashes)

coro_get_witness_hashes = async_method(get_witness_hashes)
coro_persist_witness_hashes = async_method(persist_witness_hashes)
4 changes: 2 additions & 2 deletions trinity/sync/beam/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ def avg_rtt(self) -> float:
def __str__(self) -> str:
avg_rtt = self.avg_rtt

wait_time = humanize_seconds(self.data_pause_time)
wait_time = self.data_pause_time

return (
f"BeamStat: accts={self.num_accounts}, "
f"a_nodes={self.num_account_nodes}, codes={self.num_bytecodes}, "
f"strg={self.num_storages}, s_nodes={self.num_storage_nodes}, "
f"nodes={self.num_nodes}, rtt={avg_rtt:.3f}s, wait={wait_time}"
f"nodes={self.num_nodes}, rtt={avg_rtt:.3f}s, wait={wait_time:.1f}s"
)

def __repr__(self) -> str:
Expand Down

0 comments on commit 0deec1a

Please sign in to comment.