Skip to content

Commit

Permalink
Merge pull request #333 from pipermerriam/piper/implement-getContent-…
Browse files Browse the repository at this point in the history
…JSON-RPC

Expand Alexandria JSON-RPC API
  • Loading branch information
pipermerriam committed Dec 19, 2020
2 parents 7a3882a + 3a96771 commit 64fba8f
Show file tree
Hide file tree
Showing 8 changed files with 537 additions and 176 deletions.
231 changes: 61 additions & 170 deletions ddht/tools/exfiltration.py
Original file line number Diff line number Diff line change
@@ -1,162 +1,72 @@
import argparse
import itertools
import logging
import pathlib
import tempfile
from typing import NamedTuple, Optional, Tuple
from typing import Callable, Optional, Sequence, Tuple

from eth_typing import Hash32
from eth_utils import big_endian_to_int, encode_hex, to_canonical_address
import rlp
import ssz
from ssz import sedes as ssz_sedes
import trio
from web3 import Web3
from web3 import IPCProvider, Web3

from ddht.logging import setup_stderr_logging
from ddht.v5_1.alexandria.content_storage import (
ContentNotFound,
FileSystemContentStorage,
)
from ddht.tools.w3_alexandria import AlexandriaModule
from ddht.v5_1.alexandria.content_storage import FileSystemContentStorage
from ddht.v5_1.alexandria.rlp_sedes import BlockHeader
from ddht.v5_1.alexandria.typing import ContentKey

logger = logging.getLogger("ddht.tools.exfiltration")


class AccumulatorLeaf(NamedTuple):
block_hash: Hash32
total_difficulty: int


# This makes each accumulator
EPOCH_SIZE = 2048


AccumulatorLeafSedes = ssz_sedes.Container(
field_sedes=(ssz_sedes.bytes32, ssz_sedes.uint256)
)
EpochAccumulatorSedes = ssz_sedes.List(AccumulatorLeafSedes, max_length=EPOCH_SIZE)

# This gives room for ~2 billion headers in the accumulator.
MasterAccumulatorSedes = ssz_sedes.List(ssz_sedes.bytes32, max_length=2 ** 24)


def header_key(block_hash: Hash32) -> ContentKey:
return ContentKey(b"\x01" + block_hash)


def epoch_accumulator_key(epoch: int) -> ContentKey:
return ContentKey(b"\x02" + epoch.to_bytes(3, "big"))


def master_accumulator_key(epoch: int) -> ContentKey:
return ContentKey(b"\x03" + epoch.to_bytes(3, "big"))
def get_content_storage_export_fn(
storage_dir: pathlib.Path,
) -> Callable[[Sequence[BlockHeader]], None]:
storage_dir.mkdir(parents=True, exist_ok=True)
content_storage = FileSystemContentStorage(storage_dir)
logger.info("ContentStorage: %s", storage_dir)

def export_content_storage(headers: Sequence[BlockHeader]) -> None:
for header in headers:
content_storage.set_content(
header_key(header.hash), rlp.encode(header),
)

LATEST_MASTER_ACCUMULATOR_KEY = ContentKey(b"meta:master-accumulator:latest")
return export_content_storage


def compute_epoch_accumulator(
headers: Tuple[BlockHeader, ...], previous_total_difficulty: int
) -> Tuple[AccumulatorLeaf, ...]:
if len(headers) != EPOCH_SIZE:
raise ValueError(f"Insufficient headers: need={EPOCH_SIZE} got={len(headers)}")
def get_direct_w3_export_fn(
ipc_path: pathlib.Path,
) -> Callable[[Sequence[BlockHeader]], None]:
w3 = Web3(
IPCProvider(ipc_path, timeout=30), modules={"alexandria": (AlexandriaModule,)}
)

def accumulate_total_difficulties(
previous: AccumulatorLeaf, header: BlockHeader
) -> AccumulatorLeaf:
return AccumulatorLeaf(
block_hash=header.hash,
total_difficulty=previous.total_difficulty + header.difficulty,
)
def export_alexandria_w3(headers: Sequence[BlockHeader]) -> None:
for header in headers:
w3.alexandria.add_commons_content(
header_key(header.hash), rlp.encode(header),
)

first_leaf = AccumulatorLeaf(
block_hash=headers[0].hash,
total_difficulty=previous_total_difficulty + headers[0].difficulty,
)
accumulator = tuple(
itertools.accumulate(
headers[1:], accumulate_total_difficulties, initial=first_leaf,
)
)
return accumulator
return export_alexandria_w3


async def exfiltrate_header_chain(
w3: Web3,
epoch_start_at: Optional[int],
epoch_end_at: Optional[int],
storage_dir: Optional[pathlib.Path],
export_fn: Callable[[Sequence[BlockHeader]], None],
truncate: bool = False,
) -> None:
if storage_dir is None:
storage_dir = pathlib.Path(tempfile.mkdtemp())

storage_dir.mkdir(parents=True, exist_ok=True)
content_storage = FileSystemContentStorage(storage_dir)
logger.info("ContentStorage: %s", storage_dir)

try:
master_accumulator_data = content_storage.get_content(
LATEST_MASTER_ACCUMULATOR_KEY
)
except ContentNotFound:
master_accumulator = []
else:
master_accumulator = ssz.decode(
master_accumulator_data, sedes=MasterAccumulatorSedes
)

if epoch_start_at is None:
epoch_start_at = len(master_accumulator)
elif epoch_start_at > len(master_accumulator):
raise Exception(
f"Start epoch after master accumulator: "
f"start-epoch={epoch_start_at} master={len(master_accumulator)}"
)
elif len(master_accumulator) > epoch_start_at:
if truncate is False:
num_to_truncate = len(master_accumulator) - epoch_start_at
raise Exception(
f"Pass `--truncate` to overwrite history. This will erase "
f"{num_to_truncate}"
)
logger.info(
"Deleting %d previous accumulators from epoch #%d - %d",
len(master_accumulator) - epoch_start_at,
epoch_start_at,
len(master_accumulator) - 1,
)
for epoch in range(epoch_start_at, len(master_accumulator)):
try:
content_storage.delete_content(epoch_accumulator_key(epoch))
except ContentNotFound:
logger.debug("missing epoch accumulator for deletion: epoch={epoch}")
else:
logger.debug("deleted epoch accumulator: epoch={epoch}")

try:
content_storage.delete_content(master_accumulator_key(epoch))
except ContentNotFound:
logger.debug("missing master accumulator for deletion: epoch={epoch}")
else:
logger.debug("deleted master accumulator: epoch={epoch}")

if len(master_accumulator) > 1:
latest_master_epoch = len(master_accumulator) - 1
master_accumulator_data = content_storage.get_content(
master_accumulator_key(latest_master_epoch)
)
master_accumulator = ssz.decode(
master_accumulator_data, sedes=MasterAccumulatorSedes,
)
content_storage.set_content(
LATEST_MASTER_ACCUMULATOR_KEY, master_accumulator_data, exists_ok=True,
)
else:
assert epoch_start_at == 0
master_accumulator = []
epoch_start_at = 0

if epoch_end_at is None:
latest_block_number = w3.eth.blockNumber
Expand All @@ -166,66 +76,34 @@ async def exfiltrate_header_chain(
"Starting exfiltration of epochs #%d -> %d", epoch_start_at, epoch_end_at
)

if epoch_start_at == 0:
master_accumulator = []
previous_total_difficulty = 0
elif epoch_start_at == len(master_accumulator):
prevous_epoch_accumulator_data = content_storage.get_content(
epoch_accumulator_key(epoch_start_at - 1)
)
prevous_epoch_accumulator = ssz.decode(
prevous_epoch_accumulator_data, sedes=EpochAccumulatorSedes,
)
previous_total_difficulty = prevous_epoch_accumulator[-1][1]
else:
raise Exception("Invariant")

for epoch in range(epoch_start_at, epoch_end_at):
logger.debug("starting: epoch=%d", epoch)
start_at = trio.current_time()

headers = await retrieve_epoch_headers(w3, epoch)
for header in headers:
content_storage.set_content(
header_key(header.hash), rlp.encode(header),
)
retrieval_end_at = trio.current_time()

epoch_accumulator = compute_epoch_accumulator(
headers, previous_total_difficulty
)
assert len(epoch_accumulator) == EPOCH_SIZE
epoch_accumulator_root = ssz.get_hash_tree_root(
epoch_accumulator, sedes=EpochAccumulatorSedes,
)
# content_storage.set_content(
# epoch_accumulator_key(epoch),
# ssz.encode(epoch_accumulator, sedes=EpochAccumulatorSedes),
# )

master_accumulator.append(epoch_accumulator_root)
master_accumulator_root = ssz.get_hash_tree_root(
master_accumulator, sedes=MasterAccumulatorSedes,
)
export_fn(headers)

# set the accumulator for the epoch
# master_accumulator_data = ssz.encode(
# master_accumulator, sedes=MasterAccumulatorSedes
# )
# content_storage.set_content(
# master_accumulator_key(epoch), master_accumulator_data
# )
# content_storage.set_content(
# LATEST_MASTER_ACCUMULATOR_KEY, master_accumulator_data, exists_ok=True,
# )
export_end_at = trio.current_time()

elapsed_total = export_end_at - start_at
elapsed_retrieve = retrieval_end_at - start_at
elapsed_export = export_end_at - retrieval_end_at

logger.info(
"completed: epoch=%d blocks=#%d-%d epoch_root=%s master_root=%s",
"completed: epoch=%d blocks=#%d-%d bps=%.1f (ret=%.1f exp=%.2f) "
"elapsed=%.1fs (%.1f / %.1f)",
epoch,
epoch * EPOCH_SIZE,
epoch * EPOCH_SIZE + EPOCH_SIZE,
epoch_accumulator_root.hex(),
master_accumulator_root.hex(),
EPOCH_SIZE / elapsed_total,
EPOCH_SIZE / elapsed_retrieve,
EPOCH_SIZE / elapsed_export,
elapsed_total,
100 * elapsed_retrieve / elapsed_total,
100 * elapsed_export / elapsed_total,
)
previous_total_difficulty = epoch_accumulator[-1].total_difficulty

logger.info("Finished exfiltration")

Expand Down Expand Up @@ -297,9 +175,12 @@ async def retrieve_header(w3: Web3, block_number: int) -> BlockHeader:
parser = argparse.ArgumentParser(description="Block Header Exfiltration")
parser.add_argument("--start-epoch", type=int)
parser.add_argument("--end-epoch", type=int)
parser.add_argument("--storage-dir", type=pathlib.Path)
parser.add_argument("--log-level", type=int, default=logging.INFO)

export_parser = parser.add_mutually_exclusive_group()
export_parser.add_argument("--storage-dir", type=pathlib.Path)
export_parser.add_argument("--export-w3-ipc", type=pathlib.Path)


LOGGERS_TO_MAKE_QUIET = (
("web3.providers.IPCProvider", logging.INFO),
Expand All @@ -320,6 +201,16 @@ async def retrieve_header(w3: Web3, block_number: int) -> BlockHeader:
setup_stderr_logging(args.log_level)
from web3.auto.ipc import w3

export_fn: Callable[[Sequence[BlockHeader]], None]

if args.storage_dir is not None:
export_fn = get_content_storage_export_fn(args.storage_dir)
elif args.export_w3_ipc is not None:
export_fn = get_direct_w3_export_fn(args.export_w3_ipc)
else:
temp_storage_dir = pathlib.Path(tempfile.mkdtemp())
export_fn = get_content_storage_export_fn(temp_storage_dir)

trio.run(
exfiltrate_header_chain, w3, args.start_epoch, args.end_epoch, args.storage_dir,
exfiltrate_header_chain, w3, args.start_epoch, args.end_epoch, export_fn,
)

0 comments on commit 64fba8f

Please sign in to comment.