Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize genesis shard state based on root block height #80

Merged
merged 8 commits into from Sep 13, 2018
1 change: 0 additions & 1 deletion quarkchain/cluster/cluster_config.py
Expand Up @@ -259,7 +259,6 @@ def create_from_args(cls, args):

config.LOADTEST = args.loadtest
update_genesis_config(config.QUARKCHAIN, config.LOADTEST)
GenesisManager.finalize_config(config.QUARKCHAIN)

config.MONITORING.KAFKA_REST_ADDRESS = args.monitoring_kafka_rest_address

Expand Down
23 changes: 19 additions & 4 deletions quarkchain/cluster/master.py
Expand Up @@ -300,8 +300,13 @@ def has_overlap(self, shard_mask):
return True
return False

async def send_ping(self):
req = Ping("", [], self.master_server.root_state.get_tip_block())
async def send_ping(self, initialize_shard_state=False):
root_block = (
self.master_server.root_state.get_tip_block()
if initialize_shard_state
else None
)
req = Ping("", [], root_block)
op, resp, rpc_id = await self.write_rpc_request(
op=ClusterOp.PING,
cmd=req,
Expand Down Expand Up @@ -622,6 +627,12 @@ async def __setup_slave_to_slave_connections(self):
if not success:
self.shutdown()

async def __init_shards(self):
futures = []
for slave in self.slave_pool:
futures.append(slave.send_ping(initialize_shard_state=True))
await asyncio.gather(*futures)

async def __send_mining_config_to_slaves(self, mining):
futures = []
for slave in self.slave_pool:
Expand Down Expand Up @@ -666,6 +677,7 @@ async def __init_cluster(self):
Logger.error("Missing some shards. Check cluster config file!")
return
await self.__setup_slave_to_slave_connections()
await self.__init_shards()

self.cluster_active_future.set_result(None)

Expand Down Expand Up @@ -739,7 +751,10 @@ async def __create_root_block_to_mine_or_fallback_to_minor_block(self, address):

header_list = []
# check proof of progress
for shard_id in range(self.__get_shard_size()):
shard_ids_to_check = self.env.quark_chain_config.get_initialized_shard_ids_before_root_height(
self.root_state.tip.height + 1
)
for shard_id in shard_ids_to_check:
headers = shard_id_to_header_list.get(shard_id, [])
header_list.extend(headers)
if len(headers) < self.env.quark_chain_config.PROOF_OF_PROGRESS_BLOCKS:
Expand Down Expand Up @@ -872,7 +887,7 @@ async def get_account_data(self, address: Address):
account_branch_data.branch
] = account_branch_data

check(len(branch_to_account_branch_data) == self.__get_shard_size())
check(len(branch_to_account_branch_data) == len(self.env.quark_chain_config.get_genesis_shard_ids()))
return branch_to_account_branch_data

async def get_primary_account_data(
Expand Down
148 changes: 72 additions & 76 deletions quarkchain/cluster/root_state.py
Expand Up @@ -171,8 +171,6 @@ def __create_genesis_block(self):
genesis_block = genesis_manager.create_root_block()
self.db.put_root_block(genesis_block, [])
self.db.put_root_block_index(genesis_block)
for i in range(genesis_block.header.shard_info.get_shard_size()):
self.db.put_minor_block_hash(genesis_manager.get_minor_block_hash(i))
self.tip = genesis_block.header

def get_tip_block(self):
Expand Down Expand Up @@ -259,15 +257,6 @@ def validate_block(self, block, block_hash=None):
if not self.db.contain_root_block_by_hash(block.header.hash_prev_block):
raise ValueError("previous hash block mismatch")

prev_last_minor_block_header_list = self.db.get_root_block_last_minor_block_header_list(
block.header.hash_prev_block
)

if block.header.height > 1:
check(len(prev_last_minor_block_header_list) > 0)
else:
check(len(prev_last_minor_block_header_list) == 0)

block_hash = self.validate_block_header(block.header, block_hash)

# Check the merkle tree
Expand All @@ -276,84 +265,91 @@ def validate_block(self, block, block_hash=None):
raise ValueError("incorrect merkle root")

# Check whether all minor blocks are ordered, validated (and linked to previous block)
shard_id = 0
# prev_last_minor_block_header_list can be empty for genesis root block
prev_header = (
prev_last_minor_block_header_list[0]
if prev_last_minor_block_header_list
headers_map = dict() # shard_id -> List[MinorBlockHeader]
shard_id = (
block.minor_block_header_list[0].branch.get_shard_id()
if block.minor_block_header_list
else None
)
last_minor_block_header_list = []
block_count_in_shard = 0
for idx, m_header in enumerate(block.minor_block_header_list):
if m_header.branch.get_shard_id() != shard_id:
if m_header.branch.get_shard_id() != shard_id + 1:
raise ValueError("shard id must be ordered")
if (
block_count_in_shard
< self.env.quark_chain_config.PROOF_OF_PROGRESS_BLOCKS
):
raise ValueError("fail to prove progress")
if m_header.create_time > block.header.create_time:
raise ValueError(
"minor block create time is too large {}>{}".format(
m_header.create_time, block.header.create_time
)
)
if prev_header and not self.__is_same_chain(
self.db.get_root_block_header_by_hash(block.header.hash_prev_block),
self.db.get_root_block_header_by_hash(
prev_header.hash_prev_root_block
),
):
raise ValueError(
"minor block's prev root block must be in the same chain"
)

last_minor_block_header_list.append(
block.minor_block_header_list[idx - 1]
)
shard_id += 1
block_count_in_shard = 0
prev_header = (
prev_last_minor_block_header_list[shard_id]
if prev_last_minor_block_header_list
else None
)

for m_header in block.minor_block_header_list:
if not self.db.contain_minor_block_by_hash(m_header.get_hash()):
raise ValueError(
"minor block is not validated. {}-{}".format(
m_header.branch.get_shard_id(), m_header.height
)
)
if m_header.create_time > block.header.create_time:
raise ValueError(
"minor block create time is larger than root block {} > {}".format(
m_header.create_time, block.header.create_time
)
)
if not self.__is_same_chain(
self.db.get_root_block_header_by_hash(block.header.hash_prev_block),
self.db.get_root_block_header_by_hash(m_header.hash_prev_root_block),
):
raise ValueError(
"minor block's prev root block must be in the same chain"
)

if prev_header and m_header.hash_prev_minor_block != prev_header.get_hash():
raise ValueError("minor block doesn't link to previous minor block")
if m_header.branch.get_shard_id() < shard_id:
raise ValueError("shard id must be ordered")
elif m_header.branch.get_shard_id() > shard_id:
shard_id = m_header.branch.get_shard_id()

block_count_in_shard += 1
prev_header = m_header
headers_map.setdefault(m_header.branch.get_shard_id(), []).append(m_header)
# TODO: Add coinbase

if (
shard_id != block.header.shard_info.get_shard_size() - 1
and self.env.quark_chain_config.PROOF_OF_PROGRESS_BLOCKS != 0
):
raise ValueError("fail to prove progress")
if block_count_in_shard < self.env.quark_chain_config.PROOF_OF_PROGRESS_BLOCKS:
raise ValueError("fail to prove progress")
if m_header.create_time > block.header.create_time:
raise ValueError(
"minor block create time is too large {}>{}".format(
m_header.create_time, block.header.create_time
# check proof of progress
shard_ids_to_check_proof_of_progress = self.env.quark_chain_config.get_initialized_shard_ids_before_root_height(
block.header.height
)
for shard_id in shard_ids_to_check_proof_of_progress:
if (
len(headers_map.get(shard_id, []))
< self.env.quark_chain_config.PROOF_OF_PROGRESS_BLOCKS
):
raise ValueError("fail to prove progress")

# check minor block headers are linked
prev_last_minor_block_header_list = self.db.get_root_block_last_minor_block_header_list(
block.header.hash_prev_block
)
prev_header_map = dict() # shard_id -> MinorBlockHeader or None
for header in prev_last_minor_block_header_list:
prev_header_map[header.branch.get_shard_id()] = header

last_minor_block_header_list = []
for shard_id, headers in headers_map.items():
check(len(headers) > 0)

last_minor_block_header_list.append(headers[-1])

if shard_id not in shard_ids_to_check_proof_of_progress:
raise ValueError(
"found minor block header in root block {} for uninitialized shard {}".format(
block_hash.hex(), shard_id
)
)
)
if not self.__is_same_chain(
self.db.get_root_block_header_by_hash(block.header.hash_prev_block),
self.db.get_root_block_header_by_hash(m_header.hash_prev_root_block),
):
raise ValueError("minor block's prev root block must be in the same chain")
last_minor_block_header_list.append(m_header)
prev_header_in_last_root_block = prev_header_map.get(shard_id, None)
if not prev_header_in_last_root_block:
pass
# no header in previous root block then it must start with genesis block
if headers[0].height != 0:
raise ValueError(
"genesis block height is not 0 for shard {} block hash {}".format(
shard_id, headers[0].get_hash().hex()
)
)
else:
headers = [prev_header_in_last_root_block] + headers
for i in range(len(headers) - 1):
if headers[i + 1].hash_prev_minor_block != headers[i].get_hash():
raise ValueError(
"minor block {} does not link to previous block {}".format(
headers[i + 1].get_hash(), headers[i].get_hash()
)
)

return block_hash, last_minor_block_header_list

Expand Down
2 changes: 1 addition & 1 deletion quarkchain/cluster/rpc.py
Expand Up @@ -30,7 +30,7 @@ class Ping(Serializable):
FIELDS = [
("id", PrependedSizeBytesSerializer(4)),
("shard_mask_list", PrependedSizeListSerializer(4, ShardMask)),
("root_tip", RootBlock),
("root_tip", Optional(RootBlock)), # Initialize ShardState if not None
]

def __init__(self, id, shard_mask_list, root_tip):
Expand Down
55 changes: 39 additions & 16 deletions quarkchain/cluster/shard.py
Expand Up @@ -17,7 +17,7 @@
from quarkchain.cluster.tx_generator import TransactionGenerator
from quarkchain.cluster.protocol import VirtualConnection, ClusterMetadata
from quarkchain.cluster.shard_state import ShardState
from quarkchain.core import RootBlock, MinorBlockHeader, Branch, Transaction
from quarkchain.core import RootBlock, MinorBlock, MinorBlockHeader, Branch, Transaction
from quarkchain.utils import Logger, check, time_ms
from quarkchain.db import InMemoryDb, PersistentDb

Expand Down Expand Up @@ -366,9 +366,8 @@ def __init__(self, env, shard_id, slave):
# the block that has been added locally but not have been fully propagated will have an entry here
self.add_block_futures = dict()

self.tx_generator = TransactionGenerator(
self.env.quark_chain_config, shard_id, slave, Branch.create(self.__get_shard_size(), self.shard_id)
)
self.tx_generator = TransactionGenerator(self.env.quark_chain_config, self)

self.__init_miner()

def __init_shard_db(self):
Expand All @@ -390,7 +389,7 @@ def __init_miner(self):

async def __create_block():
# hold off mining if the shard is syncing
while self.synchronizer.running:
while self.synchronizer.running or not self.state.initialized:
await asyncio.sleep(0.1)

return self.state.create_block_to_mine(address=miner_address)
Expand Down Expand Up @@ -421,9 +420,30 @@ def __get_shard_size(self):
def add_peer(self, peer: PeerShardConnection):
self.peers[peer.cluster_peer_id] = peer

def add_root_block(self, block: RootBlock):
""" Create"""
pass
async def __init_genesis_state(self, root_block: RootBlock):
block = self.state.init_genesis_state(root_block)
xshard_list = []
await self.slave.broadcast_xshard_tx_list(
block, xshard_list, root_block.header.height
)
await self.slave.send_minor_block_header_to_master(
block.header,
len(block.tx_list),
len(xshard_list),
self.state.get_shard_stats(),
)

async def init_from_root_block(self, root_block: RootBlock):
""" Either recover state from local db or create genesis state based on config"""
height = self.env.quark_chain_config.get_genesis_root_height(self.shard_id)
if root_block.header.height > height:
return self.state.init_from_root_block(root_block)

if root_block.header.height == height:
await self.__init_genesis_state(root_block)

async def add_root_block(self, root_block: RootBlock):
return self.state.add_root_block(root_block)

def broadcast_new_block(self, block):
for cluster_peer_id, peer in self.peers.items():
Expand Down Expand Up @@ -479,9 +499,7 @@ async def add_block(self, block):
""" Returns true if block is successfully added. False on any error.
called by 1. local miner (will not run if syncing) 2. SyncTask
"""
branch_value = block.header.branch.value

old_tip = self.state.tip()
old_tip = self.state.header_tip
try:
xshard_list = self.state.add_block(block)
except Exception as e:
Expand All @@ -494,7 +512,7 @@ async def add_block(self, block):
self.state.new_block_pool.pop(block.header.get_hash(), None)
# block has been added to local state, broadcast tip so that peers can sync if needed
try:
if old_tip != self.state.tip():
if old_tip != self.state.header_tip:
self.broadcast_new_tip()
except Exception:
Logger.warning_every_sec("broadcast tip failure", 1)
Expand All @@ -518,8 +536,10 @@ async def add_block(self, block):
# Start mining new one before propagating inside cluster
# The propagation should be done by the time the new block is mined
self.miner.mine_new_block_async()

await self.slave.broadcast_xshard_tx_list(block, xshard_list)
prev_root_height = self.state.db.get_root_block_by_hash(
block.header.hash_prev_root_block
).header.height
await self.slave.broadcast_xshard_tx_list(block, xshard_list, prev_root_height)
await self.slave.send_minor_block_header_to_master(
block.header,
len(block.tx_list),
Expand Down Expand Up @@ -562,7 +582,10 @@ async def add_block_list_for_sync(self, block_list):
if future:
existing_add_block_futures.append(future)
else:
block_hash_to_x_shard_list[block_hash] = xshard_list
prev_root_height = self.state.db.get_root_block_by_hash(
block.header.hash_prev_root_block
).header.height
block_hash_to_x_shard_list[block_hash] = (xshard_list, prev_root_height)
self.add_block_futures[block_hash] = self.loop.create_future()

await self.slave.batch_broadcast_xshard_tx_list(
Expand All @@ -586,7 +609,7 @@ def add_tx_list(self, tx_list, source_peer=None):
valid_tx_list.append(tx)
if not valid_tx_list:
return
self.broadcast_tx_list( valid_tx_list, source_peer)
self.broadcast_tx_list(valid_tx_list, source_peer)

def add_tx(self, tx: Transaction):
return self.state.add_tx(tx)