diff --git a/quarkchain/cluster/cluster_config.py b/quarkchain/cluster/cluster_config.py index 7d0243ec2..defe16ad7 100644 --- a/quarkchain/cluster/cluster_config.py +++ b/quarkchain/cluster/cluster_config.py @@ -259,7 +259,6 @@ def create_from_args(cls, args): config.LOADTEST = args.loadtest update_genesis_config(config.QUARKCHAIN, config.LOADTEST) - GenesisManager.finalize_config(config.QUARKCHAIN) config.MONITORING.KAFKA_REST_ADDRESS = args.monitoring_kafka_rest_address diff --git a/quarkchain/cluster/master.py b/quarkchain/cluster/master.py index 30bbd9470..0551e64f6 100644 --- a/quarkchain/cluster/master.py +++ b/quarkchain/cluster/master.py @@ -300,8 +300,13 @@ def has_overlap(self, shard_mask): return True return False - async def send_ping(self): - req = Ping("", [], self.master_server.root_state.get_tip_block()) + async def send_ping(self, initialize_shard_state=False): + root_block = ( + self.master_server.root_state.get_tip_block() + if initialize_shard_state + else None + ) + req = Ping("", [], root_block) op, resp, rpc_id = await self.write_rpc_request( op=ClusterOp.PING, cmd=req, @@ -622,6 +627,12 @@ async def __setup_slave_to_slave_connections(self): if not success: self.shutdown() + async def __init_shards(self): + futures = [] + for slave in self.slave_pool: + futures.append(slave.send_ping(initialize_shard_state=True)) + await asyncio.gather(*futures) + async def __send_mining_config_to_slaves(self, mining): futures = [] for slave in self.slave_pool: @@ -666,6 +677,7 @@ async def __init_cluster(self): Logger.error("Missing some shards. Check cluster config file!") return await self.__setup_slave_to_slave_connections() + await self.__init_shards() self.cluster_active_future.set_result(None) @@ -739,7 +751,10 @@ async def __create_root_block_to_mine_or_fallback_to_minor_block(self, address): header_list = [] # check proof of progress - for shard_id in range(self.__get_shard_size()): + shard_ids_to_check = self.env.quark_chain_config.get_initialized_shard_ids_before_root_height( + self.root_state.tip.height + 1 + ) + for shard_id in shard_ids_to_check: headers = shard_id_to_header_list.get(shard_id, []) header_list.extend(headers) if len(headers) < self.env.quark_chain_config.PROOF_OF_PROGRESS_BLOCKS: @@ -872,7 +887,7 @@ async def get_account_data(self, address: Address): account_branch_data.branch ] = account_branch_data - check(len(branch_to_account_branch_data) == self.__get_shard_size()) + check(len(branch_to_account_branch_data) == len(self.env.quark_chain_config.get_genesis_shard_ids())) return branch_to_account_branch_data async def get_primary_account_data( diff --git a/quarkchain/cluster/root_state.py b/quarkchain/cluster/root_state.py index 13268697e..837d4a7bf 100644 --- a/quarkchain/cluster/root_state.py +++ b/quarkchain/cluster/root_state.py @@ -171,8 +171,6 @@ def __create_genesis_block(self): genesis_block = genesis_manager.create_root_block() self.db.put_root_block(genesis_block, []) self.db.put_root_block_index(genesis_block) - for i in range(genesis_block.header.shard_info.get_shard_size()): - self.db.put_minor_block_hash(genesis_manager.get_minor_block_hash(i)) self.tip = genesis_block.header def get_tip_block(self): @@ -259,15 +257,6 @@ def validate_block(self, block, block_hash=None): if not self.db.contain_root_block_by_hash(block.header.hash_prev_block): raise ValueError("previous hash block mismatch") - prev_last_minor_block_header_list = self.db.get_root_block_last_minor_block_header_list( - block.header.hash_prev_block - ) - - if block.header.height > 1: - check(len(prev_last_minor_block_header_list) > 0) - else: - check(len(prev_last_minor_block_header_list) == 0) - block_hash = self.validate_block_header(block.header, block_hash) # Check the merkle tree @@ -276,84 +265,91 @@ def validate_block(self, block, block_hash=None): raise ValueError("incorrect merkle root") # Check whether all minor blocks are ordered, validated (and linked to previous block) - shard_id = 0 - # prev_last_minor_block_header_list can be empty for genesis root block - prev_header = ( - prev_last_minor_block_header_list[0] - if prev_last_minor_block_header_list + headers_map = dict() # shard_id -> List[MinorBlockHeader] + shard_id = ( + block.minor_block_header_list[0].branch.get_shard_id() + if block.minor_block_header_list else None ) - last_minor_block_header_list = [] - block_count_in_shard = 0 - for idx, m_header in enumerate(block.minor_block_header_list): - if m_header.branch.get_shard_id() != shard_id: - if m_header.branch.get_shard_id() != shard_id + 1: - raise ValueError("shard id must be ordered") - if ( - block_count_in_shard - < self.env.quark_chain_config.PROOF_OF_PROGRESS_BLOCKS - ): - raise ValueError("fail to prove progress") - if m_header.create_time > block.header.create_time: - raise ValueError( - "minor block create time is too large {}>{}".format( - m_header.create_time, block.header.create_time - ) - ) - if prev_header and not self.__is_same_chain( - self.db.get_root_block_header_by_hash(block.header.hash_prev_block), - self.db.get_root_block_header_by_hash( - prev_header.hash_prev_root_block - ), - ): - raise ValueError( - "minor block's prev root block must be in the same chain" - ) - - last_minor_block_header_list.append( - block.minor_block_header_list[idx - 1] - ) - shard_id += 1 - block_count_in_shard = 0 - prev_header = ( - prev_last_minor_block_header_list[shard_id] - if prev_last_minor_block_header_list - else None - ) - + for m_header in block.minor_block_header_list: if not self.db.contain_minor_block_by_hash(m_header.get_hash()): raise ValueError( "minor block is not validated. {}-{}".format( m_header.branch.get_shard_id(), m_header.height ) ) + if m_header.create_time > block.header.create_time: + raise ValueError( + "minor block create time is larger than root block {} > {}".format( + m_header.create_time, block.header.create_time + ) + ) + if not self.__is_same_chain( + self.db.get_root_block_header_by_hash(block.header.hash_prev_block), + self.db.get_root_block_header_by_hash(m_header.hash_prev_root_block), + ): + raise ValueError( + "minor block's prev root block must be in the same chain" + ) - if prev_header and m_header.hash_prev_minor_block != prev_header.get_hash(): - raise ValueError("minor block doesn't link to previous minor block") + if m_header.branch.get_shard_id() < shard_id: + raise ValueError("shard id must be ordered") + elif m_header.branch.get_shard_id() > shard_id: + shard_id = m_header.branch.get_shard_id() - block_count_in_shard += 1 - prev_header = m_header + headers_map.setdefault(m_header.branch.get_shard_id(), []).append(m_header) # TODO: Add coinbase - if ( - shard_id != block.header.shard_info.get_shard_size() - 1 - and self.env.quark_chain_config.PROOF_OF_PROGRESS_BLOCKS != 0 - ): - raise ValueError("fail to prove progress") - if block_count_in_shard < self.env.quark_chain_config.PROOF_OF_PROGRESS_BLOCKS: - raise ValueError("fail to prove progress") - if m_header.create_time > block.header.create_time: - raise ValueError( - "minor block create time is too large {}>{}".format( - m_header.create_time, block.header.create_time + # check proof of progress + shard_ids_to_check_proof_of_progress = self.env.quark_chain_config.get_initialized_shard_ids_before_root_height( + block.header.height + ) + for shard_id in shard_ids_to_check_proof_of_progress: + if ( + len(headers_map.get(shard_id, [])) + < self.env.quark_chain_config.PROOF_OF_PROGRESS_BLOCKS + ): + raise ValueError("fail to prove progress") + + # check minor block headers are linked + prev_last_minor_block_header_list = self.db.get_root_block_last_minor_block_header_list( + block.header.hash_prev_block + ) + prev_header_map = dict() # shard_id -> MinorBlockHeader or None + for header in prev_last_minor_block_header_list: + prev_header_map[header.branch.get_shard_id()] = header + + last_minor_block_header_list = [] + for shard_id, headers in headers_map.items(): + check(len(headers) > 0) + + last_minor_block_header_list.append(headers[-1]) + + if shard_id not in shard_ids_to_check_proof_of_progress: + raise ValueError( + "found minor block header in root block {} for uninitialized shard {}".format( + block_hash.hex(), shard_id + ) ) - ) - if not self.__is_same_chain( - self.db.get_root_block_header_by_hash(block.header.hash_prev_block), - self.db.get_root_block_header_by_hash(m_header.hash_prev_root_block), - ): - raise ValueError("minor block's prev root block must be in the same chain") - last_minor_block_header_list.append(m_header) + prev_header_in_last_root_block = prev_header_map.get(shard_id, None) + if not prev_header_in_last_root_block: + pass + # no header in previous root block then it must start with genesis block + if headers[0].height != 0: + raise ValueError( + "genesis block height is not 0 for shard {} block hash {}".format( + shard_id, headers[0].get_hash().hex() + ) + ) + else: + headers = [prev_header_in_last_root_block] + headers + for i in range(len(headers) - 1): + if headers[i + 1].hash_prev_minor_block != headers[i].get_hash(): + raise ValueError( + "minor block {} does not link to previous block {}".format( + headers[i + 1].get_hash(), headers[i].get_hash() + ) + ) return block_hash, last_minor_block_header_list diff --git a/quarkchain/cluster/rpc.py b/quarkchain/cluster/rpc.py index 621f0120b..a5584c86b 100644 --- a/quarkchain/cluster/rpc.py +++ b/quarkchain/cluster/rpc.py @@ -30,7 +30,7 @@ class Ping(Serializable): FIELDS = [ ("id", PrependedSizeBytesSerializer(4)), ("shard_mask_list", PrependedSizeListSerializer(4, ShardMask)), - ("root_tip", RootBlock), + ("root_tip", Optional(RootBlock)), # Initialize ShardState if not None ] def __init__(self, id, shard_mask_list, root_tip): diff --git a/quarkchain/cluster/shard.py b/quarkchain/cluster/shard.py index 9dc778e75..00f5da5d6 100644 --- a/quarkchain/cluster/shard.py +++ b/quarkchain/cluster/shard.py @@ -17,7 +17,7 @@ from quarkchain.cluster.tx_generator import TransactionGenerator from quarkchain.cluster.protocol import VirtualConnection, ClusterMetadata from quarkchain.cluster.shard_state import ShardState -from quarkchain.core import RootBlock, MinorBlockHeader, Branch, Transaction +from quarkchain.core import RootBlock, MinorBlock, MinorBlockHeader, Branch, Transaction from quarkchain.utils import Logger, check, time_ms from quarkchain.db import InMemoryDb, PersistentDb @@ -366,9 +366,8 @@ def __init__(self, env, shard_id, slave): # the block that has been added locally but not have been fully propagated will have an entry here self.add_block_futures = dict() - self.tx_generator = TransactionGenerator( - self.env.quark_chain_config, shard_id, slave, Branch.create(self.__get_shard_size(), self.shard_id) - ) + self.tx_generator = TransactionGenerator(self.env.quark_chain_config, self) + self.__init_miner() def __init_shard_db(self): @@ -390,7 +389,7 @@ def __init_miner(self): async def __create_block(): # hold off mining if the shard is syncing - while self.synchronizer.running: + while self.synchronizer.running or not self.state.initialized: await asyncio.sleep(0.1) return self.state.create_block_to_mine(address=miner_address) @@ -421,9 +420,30 @@ def __get_shard_size(self): def add_peer(self, peer: PeerShardConnection): self.peers[peer.cluster_peer_id] = peer - def add_root_block(self, block: RootBlock): - """ Create""" - pass + async def __init_genesis_state(self, root_block: RootBlock): + block = self.state.init_genesis_state(root_block) + xshard_list = [] + await self.slave.broadcast_xshard_tx_list( + block, xshard_list, root_block.header.height + ) + await self.slave.send_minor_block_header_to_master( + block.header, + len(block.tx_list), + len(xshard_list), + self.state.get_shard_stats(), + ) + + async def init_from_root_block(self, root_block: RootBlock): + """ Either recover state from local db or create genesis state based on config""" + height = self.env.quark_chain_config.get_genesis_root_height(self.shard_id) + if root_block.header.height > height: + return self.state.init_from_root_block(root_block) + + if root_block.header.height == height: + await self.__init_genesis_state(root_block) + + async def add_root_block(self, root_block: RootBlock): + return self.state.add_root_block(root_block) def broadcast_new_block(self, block): for cluster_peer_id, peer in self.peers.items(): @@ -479,9 +499,7 @@ async def add_block(self, block): """ Returns true if block is successfully added. False on any error. called by 1. local miner (will not run if syncing) 2. SyncTask """ - branch_value = block.header.branch.value - - old_tip = self.state.tip() + old_tip = self.state.header_tip try: xshard_list = self.state.add_block(block) except Exception as e: @@ -494,7 +512,7 @@ async def add_block(self, block): self.state.new_block_pool.pop(block.header.get_hash(), None) # block has been added to local state, broadcast tip so that peers can sync if needed try: - if old_tip != self.state.tip(): + if old_tip != self.state.header_tip: self.broadcast_new_tip() except Exception: Logger.warning_every_sec("broadcast tip failure", 1) @@ -518,8 +536,10 @@ async def add_block(self, block): # Start mining new one before propagating inside cluster # The propagation should be done by the time the new block is mined self.miner.mine_new_block_async() - - await self.slave.broadcast_xshard_tx_list(block, xshard_list) + prev_root_height = self.state.db.get_root_block_by_hash( + block.header.hash_prev_root_block + ).header.height + await self.slave.broadcast_xshard_tx_list(block, xshard_list, prev_root_height) await self.slave.send_minor_block_header_to_master( block.header, len(block.tx_list), @@ -562,7 +582,10 @@ async def add_block_list_for_sync(self, block_list): if future: existing_add_block_futures.append(future) else: - block_hash_to_x_shard_list[block_hash] = xshard_list + prev_root_height = self.state.db.get_root_block_by_hash( + block.header.hash_prev_root_block + ).header.height + block_hash_to_x_shard_list[block_hash] = (xshard_list, prev_root_height) self.add_block_futures[block_hash] = self.loop.create_future() await self.slave.batch_broadcast_xshard_tx_list( @@ -586,7 +609,7 @@ def add_tx_list(self, tx_list, source_peer=None): valid_tx_list.append(tx) if not valid_tx_list: return - self.broadcast_tx_list( valid_tx_list, source_peer) + self.broadcast_tx_list(valid_tx_list, source_peer) def add_tx(self, tx: Transaction): return self.state.add_tx(tx) diff --git a/quarkchain/cluster/shard_db_operator.py b/quarkchain/cluster/shard_db_operator.py index e34cbb2fb..851a32558 100644 --- a/quarkchain/cluster/shard_db_operator.py +++ b/quarkchain/cluster/shard_db_operator.py @@ -174,7 +174,12 @@ def __init__(self, db, env, branch: Branch): def __get_last_minor_block_in_root_block(self, root_block): # genesis root block contains no minor block header - if root_block.header.height == 0: + if ( + root_block.header.height + == self.env.quark_chain_config.get_genesis_root_height( + self.branch.get_shard_id() + ) + ): return None l_header = None @@ -203,7 +208,12 @@ def recover_state(self, r_header, m_header): r_hash ] = self.__get_last_minor_block_in_root_block(block) self.r_header_pool[r_hash] = block.header - if block.header.height <= 0: + if ( + block.header.height + <= self.env.quark_chain_config.get_genesis_root_height( + self.branch.get_shard_id() + ) + ): break r_hash = block.header.hash_prev_block diff --git a/quarkchain/cluster/shard_state.py b/quarkchain/cluster/shard_state.py index 99a4968e0..693e69cf4 100644 --- a/quarkchain/cluster/shard_state.py +++ b/quarkchain/cluster/shard_state.py @@ -76,10 +76,6 @@ def __init__(self, env, shard_id, db=None, diff_calc=None): last_price=0, last_head=b"", check_blocks=5, percentile=50 ) - # assure ShardState is in good shape after constructor returns though we still - # rely on master calling init_from_root_block to bring the cluster into consistency - self.__init_genesis_state(shard_id) - # new blocks that passed POW validation and should be made available to whole network self.new_block_pool = dict() @@ -99,19 +95,21 @@ def __get_header_tip_from_root_block(branch): check(header_tip is not None) return header_tip + check( + root_block.header.height + > self.env.quark_chain_config.get_genesis_root_height(self.shard_id) + ) check(not self.initialized) self.initialized = True Logger.info( - "Initializing shard state from root height {} hash {}".format( - root_block.header.height, root_block.header.get_hash().hex() + "[{}] Initializing shard state from root height {} hash {}".format( + self.shard_id, + root_block.header.height, + root_block.header.get_hash().hex(), ) ) - if root_block.header.height <= 0: - Logger.info("Start from genesis block") - return - shard_size = root_block.header.shard_info.get_shard_size() check(self.branch == Branch.create(shard_size, self.shard_id)) self.root_tip = root_block.header @@ -119,8 +117,8 @@ def __get_header_tip_from_root_block(branch): self.db.recover_state(self.root_tip, self.header_tip) Logger.info( - "[{}] done recovery from db. shard tip {} {} root tip {} {}".format( - self.branch.get_shard_id(), + "[{}] Done recovery from db. shard tip {} {}, root tip {} {}".format( + self.shard_id, self.header_tip.height, self.header_tip.get_hash().hex(), self.root_tip.height, @@ -145,39 +143,44 @@ def __get_header_tip_from_root_block(branch): def __create_evm_state(self): return EvmState(env=self.env.evm_env, db=self.raw_db) - def __init_genesis_state(self, shard_id): - genesis_manager = GenesisManager(self.env.quark_chain_config) + def init_genesis_state(self, root_block): + """ root_block should have the same height as configured in shard GENESIS """ + height = self.env.quark_chain_config.get_genesis_root_height(self.shard_id) + check(root_block.header.height == height) - # no need to recreate if the db already has it - genesis_block = self.db.get_minor_block_by_height(0) - if not genesis_block: - genesis_block = genesis_manager.create_minor_block( - shard_id, self.__create_evm_state() - ) - check( - genesis_block.header.get_hash() - == genesis_manager.get_minor_block_hash(self.branch.get_shard_id()) + genesis_manager = GenesisManager(self.env.quark_chain_config) + genesis_block = genesis_manager.create_minor_block( + root_block, self.shard_id, self.__create_evm_state() ) - genesis_root = genesis_manager.create_root_block() self.db.put_minor_block(genesis_block, []) self.db.put_minor_block_index(genesis_block) - self.db.put_root_block(genesis_root) + self.db.put_root_block(root_block) - for i in range(self.branch.get_shard_size()): - self.db.put_minor_block_xshard_tx_list( - genesis_manager.get_minor_block_hash(i), CrossShardTransactionList([]) - ) + if self.initialized: + # already initialized. just return the block without resetting the state. + return genesis_block self.evm_state = self.__create_evm_state() self.evm_state.trie.root_hash = genesis_block.meta.hash_evm_state_root - self.root_tip = genesis_root.header + self.root_tip = root_block.header # Tips that are confirmed by root self.confirmed_header_tip = None # Tips that are unconfirmed by root self.header_tip = genesis_block.header self.meta_tip = genesis_block.meta + Logger.info( + "[{}] Initialized genensis state at root block {} {}, genesis block hash {}".format( + self.shard_id, + self.root_tip.height, + self.root_tip.get_hash().hex(), + self.header_tip.get_hash().hex(), + ) + ) + self.initialized = True + return genesis_block + def __validate_tx( self, tx: Transaction, evm_state, from_address=None, gas=None ) -> EvmTransaction: @@ -667,20 +670,13 @@ def add_block(self, block): ) return evm_state.xshard_list - def get_tip(self): + def get_tip(self) -> MinorBlock: return self.db.get_minor_block_by_hash(self.header_tip.get_hash()) - def tip(self): - """ Called in diff.py """ - return self.header_tip - def finalize_and_add_block(self, block): block.finalize(evm_state=self.run_block(block)) self.add_block(block) - def get_block_header_by_height(self, height): - pass - def get_balance(self, recipient: bytes, height: Optional[int] = None) -> int: evm_state = self._get_evm_state_from_height(height) if not evm_state: @@ -918,7 +914,13 @@ def add_cross_shard_tx_list_by_minor_block_hash( def add_root_block(self, root_block): """ Add a root block. Make sure all cross shard tx lists of remote shards confirmed by the root block are in local db. + Return True if the new block become head else False. + Raise ValueError on any failure. """ + check( + root_block.header.height + > self.env.quark_chain_config.get_genesis_root_height(self.shard_id) + ) if not self.db.contain_root_block_by_hash(root_block.header.hash_prev_block): raise ValueError("cannot find previous root block in pool") @@ -936,11 +938,19 @@ def add_root_block(self, root_block): continue if not self.db.contain_remote_minor_block_hash(h): - raise ValueError( - "cannot find x_shard tx list for {}-{} {}".format( - m_header.branch.get_shard_id(), m_header.height, h.hex() - ) + prev_root = self.db.get_root_block_by_hash( + m_header.hash_prev_root_block ) + if ( + prev_root + and prev_root.header.height + > self.env.quark_chain_config.get_genesis_root_height(self.shard_id) + ): + raise ValueError( + "cannot find x_shard tx list for {}-{} {}".format( + m_header.branch.get_shard_id(), m_header.height, h.hex() + ) + ) # shard_header cannot be None since PROOF_OF_PROGRESS should be positive check(shard_header is not None) @@ -1027,8 +1037,18 @@ def __get_cross_shard_tx_list_by_root_block_hash(self, h): if not self.__is_neighbor(m_header.branch): continue - h = m_header.get_hash() - tx_list.extend(self.db.get_minor_block_xshard_tx_list(h).tx_list) + xshard_tx_list = self.db.get_minor_block_xshard_tx_list(m_header.get_hash()) + prev_root = self.db.get_root_block_by_hash( + m_header.hash_prev_root_block + ) + if ( + not prev_root + or prev_root.header.height + <= self.env.quark_chain_config.get_genesis_root_height(self.shard_id) + ): + check(xshard_tx_list is None) + continue + tx_list.extend(xshard_tx_list.tx_list) # Apply root block coinbase if self.branch.is_in_shard(r_block.header.coinbase_address.full_shard_id): diff --git a/quarkchain/cluster/slave.py b/quarkchain/cluster/slave.py index a872ea08a..b0e3deb8b 100644 --- a/quarkchain/cluster/slave.py +++ b/quarkchain/cluster/slave.py @@ -158,7 +158,8 @@ def close_connection(self, conn): # Cluster RPC handlers async def handle_ping(self, ping): - self.slave_server.init_shard_states(ping.root_tip) + if ping.root_tip: + await self.slave_server.init_shards(ping.root_tip) return Pong(self.slave_server.id, self.slave_server.shard_mask_list) async def handle_connect_to_slaves_request(self, connect_to_slave_request): @@ -194,20 +195,23 @@ async def handle_add_root_block_request(self, req): # TODO: handle expect_switch error_code = 0 switched = False + # TODO: asyncio.gather for shard in self.shards.values(): try: - switched = shard.state.add_root_block(req.root_block) + switched = await shard.add_root_block(req.root_block) except ValueError: Logger.log_exception() - # TODO: May be enum or Unix errno? - error_code = errno.EBADMSG - break + return AddRootBlockResponse(errno.EBADMSG, False) + + await self.slave_server.init_shards(req.root_block) return AddRootBlockResponse(error_code, switched) async def handle_get_eco_info_list_request(self, _req): eco_info_list = [] for branch, shard in self.shards.items(): + if not shard.state.initialized: + continue eco_info_list.append( EcoInfo( branch=branch, @@ -251,6 +255,8 @@ async def handle_add_minor_block_request(self, req): async def handle_get_unconfirmed_header_list_request(self, _req): headers_info_list = [] for branch, shard in self.shards.items(): + if not shard.state.initialized: + continue headers_info_list.append( HeadersInfo( branch=branch, header_list=shard.state.get_unconfirmed_header_list() @@ -752,38 +758,42 @@ def __init__(self, env, name="slave"): self.master = None self.name = name + self.mining = False self.artificial_tx_config = None self.shards = dict() # type: Dict[Branch, Shard] - self.__init_shards() self.shutdown_in_progress = False # block hash -> future (that will return when the block is fully propagated in the cluster) # the block that has been added locally but not have been fully propagated will have an entry here self.add_block_futures = dict() - def __init_shards(self): - """ branch_value -> ShardState mapping """ - shard_size = self.__get_shard_size() - branch_values = set() + def cover_shard_id(self, shard_id): for shard_mask in self.shard_mask_list: - for shard_id in shard_mask.iterate(shard_size): - branch_value = shard_id + shard_size - branch_values.add(branch_value) - - for branch_value in branch_values: - branch = Branch(branch_value) - shard_id = branch.get_shard_id() - self.shards[branch] = Shard(self.env, shard_id, self) + if shard_mask.contain_shard_id(shard_id): + return True + return False - def init_shard_states(self, root_tip): - """ Will be called when master connects to slaves """ - for _, shard in self.shards.items(): - # TODO: shard.init_from_root_block - shard.state.init_from_root_block(root_tip) + async def init_shards(self, root_block: RootBlock): + """ Create shards based on GENESIS config and root block height """ + # TODO: asyncio.gather + for shard_id, shard_config in enumerate(self.env.quark_chain_config.SHARD_LIST): + branch = Branch.create(self.env.quark_chain_config.SHARD_SIZE, shard_id) + if branch in self.shards: + continue + if not self.cover_shard_id(shard_id) or not shard_config.GENESIS: + continue + if root_block.header.height >= shard_config.GENESIS.ROOT_HEIGHT: + shard = Shard(self.env, shard_id, self) + await shard.init_from_root_block(root_block) + self.shards[branch] = shard + if self.mining: + shard.miner.enable() + shard.miner.mine_new_block_async() def start_mining(self, artificial_tx_config): self.artificial_tx_config = artificial_tx_config + self.mining = True for branch, shard in self.shards.items(): Logger.info( "[{}] start mining with target minor block time {} seconds".format( @@ -798,6 +808,7 @@ def create_transactions(self, num_tx_per_shard, x_shard_percent, tx: Transaction shard.tx_generator.generate(num_tx_per_shard, x_shard_percent, tx) def stop_mining(self): + self.mining = False for branch, shard in self.shards.items(): Logger.info("[{}] stop mining".format(branch.get_shard_id())) shard.miner.disable() @@ -864,33 +875,40 @@ async def send_minor_block_header_to_master( check(resp.error_code == 0) self.artificial_tx_config = resp.artificial_tx_config - def __get_branch_to_add_xshard_tx_list_request(self, block_hash, xshard_tx_list): - branch_to_add_xshard_tx_list_request = dict() + def __get_branch_to_add_xshard_tx_list_request( + self, block_hash, xshard_tx_list, prev_root_height + ): + xshard_map = dict() # type: Dict[Branch, List[CrossShardTransactionDeposit]] - xshard_map = dict() - for shard_id in range(self.__get_shard_size()): - xshard_map[shard_id + self.__get_shard_size()] = [] + # only broadcast to the shards that have been initialized + initialized_shard_ids = self.env.quark_chain_config.get_initialized_shard_ids_before_root_height( + prev_root_height + ) + for shard_id in initialized_shard_ids: + branch = Branch.create(self.__get_shard_size(), shard_id) + xshard_map[branch] = [] for xshard_tx in xshard_tx_list: shard_id = xshard_tx.to_address.get_shard_id(self.__get_shard_size()) - branch_value = Branch.create(self.__get_shard_size(), shard_id).value - xshard_map[branch_value].append(xshard_tx) + branch = Branch.create(self.__get_shard_size(), shard_id) + check(branch in xshard_map) + xshard_map[branch].append(xshard_tx) - for branch_value, tx_list in xshard_map.items(): + branch_to_add_xshard_tx_list_request = dict() # type: Dict[Branch, AddXshardTxListRequest] + for branch, tx_list in xshard_map.items(): cross_shard_tx_list = CrossShardTransactionList(tx_list) - branch = Branch(branch_value) request = AddXshardTxListRequest(branch, block_hash, cross_shard_tx_list) branch_to_add_xshard_tx_list_request[branch] = request return branch_to_add_xshard_tx_list_request - async def broadcast_xshard_tx_list(self, block, xshard_tx_list): + async def broadcast_xshard_tx_list(self, block, xshard_tx_list, prev_root_height): """ Broadcast x-shard transactions to their recipient shards """ block_hash = block.header.get_hash() branch_to_add_xshard_tx_list_request = self.__get_branch_to_add_xshard_tx_list_request( - block_hash, xshard_tx_list + block_hash, xshard_tx_list, prev_root_height ) rpc_futures = [] for branch, request in branch_to_add_xshard_tx_list_request.items(): @@ -915,12 +933,19 @@ async def broadcast_xshard_tx_list(self, block, xshard_tx_list): check(all([response.error_code == 0 for _, response, _ in responses])) async def batch_broadcast_xshard_tx_list( - self, block_hash_to_xshard_list, source_branch: Branch + self, + block_hash_to_xshard_list_and_prev_root_height: Dict[bytes, Tuple[List, int]], + source_branch: Branch, ): branch_to_add_xshard_tx_list_request_list = dict() - for block_hash, x_shard_list in block_hash_to_xshard_list.items(): + for ( + block_hash, + x_shard_list_and_prev_root_height, + ) in block_hash_to_xshard_list_and_prev_root_height.items(): + xshard_tx_list = x_shard_list_and_prev_root_height[0] + prev_root_height = x_shard_list_and_prev_root_height[1] branch_to_add_xshard_tx_list_request = self.__get_branch_to_add_xshard_tx_list_request( - block_hash, x_shard_list + block_hash, xshard_tx_list, prev_root_height ) for branch, request in branch_to_add_xshard_tx_list_request.items(): if branch == source_branch or not is_neighbor(branch, source_branch): diff --git a/quarkchain/cluster/tests/test_cluster.py b/quarkchain/cluster/tests/test_cluster.py index 48d6fbbd0..a967d591e 100644 --- a/quarkchain/cluster/tests/test_cluster.py +++ b/quarkchain/cluster/tests/test_cluster.py @@ -20,6 +20,40 @@ def test_three_clusters(self): with ClusterContext(3) as clusters: self.assertEqual(len(clusters), 3) + def test_create_shard_at_different_height(self): + acc1 = Address.create_random_account() + with ClusterContext(1, acc1, genesis_root_heights=[1, 2]) as clusters: + master = clusters[0].master + slaves = clusters[0].slave_list + + self.assertEqual(len(slaves[0].shards), 0) + self.assertEqual(len(slaves[1].shards), 0) + + is_root, root = call_async(master.get_next_block_to_mine(acc1)) + self.assertTrue(is_root) + self.assertEqual(len(root.minor_block_header_list), 0) + call_async(master.add_root_block(root)) + + # shard 0 created at root height 1 + self.assertEqual(len(slaves[0].shards), 1) + self.assertEqual(len(slaves[1].shards), 0) + + is_root, root = call_async(master.get_next_block_to_mine(acc1)) + self.assertTrue(is_root) + self.assertEqual(len(root.minor_block_header_list), 1) + call_async(master.add_root_block(root)) + + self.assertEqual(len(slaves[0].shards), 1) + # shard 1 created at root height 2 + self.assertEqual(len(slaves[1].shards), 1) + + # Expect to mine shard 0 due to proof of progress + is_root, block = call_async(master.get_next_block_to_mine(acc1)) + self.assertFalse(is_root) + self.assertEqual(block.header.branch.get_shard_id(), 0) + self.assertEqual(block.header.height, 1) + + def test_get_next_block_to_mine(self): id1 = Identity.create_random_identity() acc1 = Address.create_from_identity(id1, full_shard_id=0) @@ -208,8 +242,8 @@ def test_add_minor_block_request_list(self): ) self.assertTrue(addResult) - # Make sure the xshard list is added to another slave - self.assertTrue( + # Make sure the xshard list is not broadcasted to the other shard + self.assertFalse( clusters[0] .slave_list[1] .shards[Branch(0b11)] @@ -228,12 +262,6 @@ def test_add_minor_block_request_list(self): .shards[Branch(0b10)] .state.contain_block_by_hash(b1.header.get_hash()) ) - assert_true_with_timeout( - lambda: clusters[1] - .slave_list[1] - .shards[Branch(0b11)] - .state.contain_remote_minor_block_hash(b1.header.get_hash()) - ) assert_true_with_timeout( lambda: clusters[1].master.root_state.is_minor_block_validated( b1.header.get_hash() @@ -249,7 +277,7 @@ def test_add_root_block_request_list(self): clusters[1].peer.close() # add blocks in cluster 0 - block_header_list = [] + block_header_list = [clusters[0].get_shard_state(0).header_tip] for i in range(13): shardState0 = clusters[0].slave_list[0].shards[Branch(0b10)].state b1 = shardState0.get_tip().create_block_to_append() @@ -262,6 +290,7 @@ def test_add_root_block_request_list(self): self.assertTrue(addResult) block_header_list.append(b1.header) + block_header_list.append(clusters[0].get_shard_state(1).header_tip) shardState0 = clusters[0].slave_list[1].shards[Branch(0b11)].state b2 = shardState0.get_tip().create_block_to_append() b2.finalize(evm_state=shardState0.run_block(b2)) @@ -411,6 +440,12 @@ def test_broadcast_cross_shard_transactions(self): master = clusters[0].master slaves = clusters[0].slave_list + # Add a root block first so that later minor blocks referring to this root + # can be broadcasted to other shards + is_root, root_block = call_async(master.get_next_block_to_mine(Address.create_empty_account(), prefer_root=True)) + self.assertTrue(is_root) + call_async(master.add_root_block(root_block)) + tx1 = create_transfer_transaction( shard_state=clusters[0].get_shard_state(0), key=id1.get_key(), @@ -503,6 +538,14 @@ def test_broadcast_cross_shard_transactions_to_neighbor_only(self): with ClusterContext(1, acc1, 64, num_slaves=4) as clusters: master = clusters[0].master slaves = clusters[0].slave_list + + # Add a root block first so that later minor blocks referring to this root + # can be broadcasted to other shards + is_root, root_block = call_async( + master.get_next_block_to_mine(Address.create_empty_account(), prefer_root=True)) + self.assertTrue(is_root) + call_async(master.add_root_block(root_block)) + b1 = ( slaves[0] .shards[Branch(64 | 0)] diff --git a/quarkchain/cluster/tests/test_root_state.py b/quarkchain/cluster/tests/test_root_state.py index f026d7940..5c7949f64 100644 --- a/quarkchain/cluster/tests/test_root_state.py +++ b/quarkchain/cluster/tests/test_root_state.py @@ -12,10 +12,22 @@ def create_default_state(env, diff_calc=None): r_state = RootState(env=env, diff_calc=diff_calc) - s_state_list = [ - ShardState(env=env, shard_id=shard_id, db=quarkchain.db.InMemoryDb()) - for shard_id in range(env.quark_chain_config.SHARD_SIZE) - ] + s_state_list = [] + for shard_id in range(env.quark_chain_config.SHARD_SIZE): + shard_state = ShardState(env=env, shard_id=shard_id, db=quarkchain.db.InMemoryDb()) + shard_state.init_genesis_state(r_state.get_tip_block()) + s_state_list.append(shard_state) + + for state in s_state_list: + block_hash = state.header_tip.get_hash() + for dst_state in s_state_list: + if state == dst_state: + continue + dst_state.add_cross_shard_tx_list_by_minor_block_hash( + block_hash, CrossShardTransactionList(tx_list=[]) + ) + r_state.add_validated_minor_block_hash(block_hash) + return (r_state, s_state_list) @@ -49,7 +61,9 @@ def test_root_state_add_block(self): r_state.add_validated_minor_block_hash(b1.header.get_hash()) root_block = ( r_state.tip.create_block_to_append() + .add_minor_block_header(s_states[0].db.get_minor_block_by_height(0).header) .add_minor_block_header(b0.header) + .add_minor_block_header(s_states[1].db.get_minor_block_by_height(0).header) .add_minor_block_header(b1.header) .finalize() ) @@ -75,7 +89,9 @@ def test_root_state_and_shard_state_add_block(self): r_state.add_validated_minor_block_hash(b1.header.get_hash()) root_block = ( r_state.tip.create_block_to_append() + .add_minor_block_header(s_states[0].db.get_minor_block_by_height(0).header) .add_minor_block_header(b0.header) + .add_minor_block_header(s_states[1].db.get_minor_block_by_height(0).header) .add_minor_block_header(b1.header) .finalize() ) @@ -123,7 +139,9 @@ def test_root_state_and_shard_state_add_two_blocks(self): r_state.add_validated_minor_block_hash(b1.header.get_hash()) root_block0 = ( r_state.tip.create_block_to_append() + .add_minor_block_header(s_states[0].db.get_minor_block_by_height(0).header) .add_minor_block_header(b0.header) + .add_minor_block_header(s_states[1].db.get_minor_block_by_height(0).header) .add_minor_block_header(b1.header) .finalize() ) @@ -161,7 +179,9 @@ def test_root_state_and_shard_state_fork(self): r_state.add_validated_minor_block_hash(b1.header.get_hash()) root_block0 = ( r_state.tip.create_block_to_append() + .add_minor_block_header(s_states[0].db.get_minor_block_by_height(0).header) .add_minor_block_header(b0.header) + .add_minor_block_header(s_states[1].db.get_minor_block_by_height(0).header) .add_minor_block_header(b1.header) .finalize() ) @@ -177,7 +197,10 @@ def test_root_state_and_shard_state_fork(self): r_state.add_validated_minor_block_hash(b2.header.get_hash()) r_state.add_validated_minor_block_hash(b3.header.get_hash()) root_block1 = ( - root_block1.add_minor_block_header(b2.header) + root_block1 + .add_minor_block_header(s_states[0].db.get_minor_block_by_height(0).header) + .add_minor_block_header(b2.header) + .add_minor_block_header(s_states[1].db.get_minor_block_by_height(0).header) .add_minor_block_header(b3.header) .finalize() ) @@ -211,15 +234,16 @@ def test_root_state_difficulty(self): env = get_test_env() env.quark_chain_config.SKIP_ROOT_DIFFICULTY_CHECK = False env.quark_chain_config.ROOT.GENESIS.DIFFICULTY = 1000 - GenesisManager.finalize_config(env.quark_chain_config) diff_calc = EthDifficultyCalculator(cutoff=9, diff_factor=2048, minimum_diff=1) env.quark_chain_config.NETWORK_ID = ( 1 ) # other network ids will skip difficulty check r_state, s_states = create_default_state(env, diff_calc=diff_calc) + g0 = s_states[0].header_tip b0 = s_states[0].get_tip().create_block_to_append() add_minor_block_to_cluster(s_states, b0) + g1 = s_states[1].header_tip b1 = s_states[1].get_tip().create_block_to_append() add_minor_block_to_cluster(s_states, b1) @@ -244,7 +268,7 @@ def test_root_state_difficulty(self): ) root_block0 = r_state.create_block_to_mine( - m_header_list=[b0.header, b1.header], + m_header_list=[g0, b0.header, g1, b1.header], address=Address.create_empty_account(), create_time=r_state.tip.create_time + 26, ).finalize() @@ -279,14 +303,18 @@ def test_root_state_recovery(self): r_state.add_validated_minor_block_hash(b1.header.get_hash()) root_block0 = ( r_state.tip.create_block_to_append() + .add_minor_block_header(s_states[0].db.get_minor_block_by_height(0).header) .add_minor_block_header(b0.header) + .add_minor_block_header(s_states[1].db.get_minor_block_by_height(0).header) .add_minor_block_header(b1.header) .finalize() ) root_block00 = ( r_state.tip.create_block_to_append() + .add_minor_block_header(s_states[0].db.get_minor_block_by_height(0).header) .add_minor_block_header(b0.header) + .add_minor_block_header(s_states[1].db.get_minor_block_by_height(0).header) .add_minor_block_header(b1.header) .finalize() ) @@ -358,6 +386,7 @@ def test_add_root_block_with_minor_block_with_wrong_root_block_hash(self): """ env = get_test_env(shard_size=1) r_state, s_states = create_default_state(env) + genesis_header = s_states[0].header_tip root_block0 = r_state.get_tip_block() @@ -367,11 +396,13 @@ def test_add_root_block_with_minor_block_with_wrong_root_block_hash(self): r_state.add_validated_minor_block_hash(m1.header.get_hash()) root_block1 = ( root_block0.create_block_to_append(nonce=0) + .add_minor_block_header(genesis_header) .add_minor_block_header(m1.header) .finalize() ) root_block2 = ( root_block0.create_block_to_append(nonce=1) + .add_minor_block_header(genesis_header) .add_minor_block_header(m1.header) .finalize() ) @@ -419,10 +450,11 @@ def test_add_minor_block_with_wrong_root_block_hash(self): | +--+ +------|m2| +--+ - where m3 is invalid because m3 depeonds on r2, whose minor chain is not the same chain as m3 + where m3 is invalid because m3 depends on r2, whose minor chain is not the same chain as m3 """ env = get_test_env(shard_size=1) r_state, s_states = create_default_state(env) + genesis_header = s_states[0].header_tip root_block0 = r_state.get_tip_block() @@ -435,11 +467,13 @@ def test_add_minor_block_with_wrong_root_block_hash(self): r_state.add_validated_minor_block_hash(m2.header.get_hash()) root_block1 = ( root_block0.create_block_to_append(nonce=0) + .add_minor_block_header(genesis_header) .add_minor_block_header(m1.header) .finalize() ) root_block2 = ( root_block0.create_block_to_append(nonce=1) + .add_minor_block_header(genesis_header) .add_minor_block_header(m2.header) .finalize() ) diff --git a/quarkchain/cluster/tests/test_shard_state.py b/quarkchain/cluster/tests/test_shard_state.py index 7fac829cb..e532fdfe6 100644 --- a/quarkchain/cluster/tests/test_shard_state.py +++ b/quarkchain/cluster/tests/test_shard_state.py @@ -14,7 +14,9 @@ def create_default_shard_state(env, shard_id=0, diff_calc=None): + genesis_manager = GenesisManager(env.quark_chain_config) shard_state = ShardState(env=env, shard_id=shard_id, diff_calc=diff_calc) + shard_state.init_genesis_state(genesis_manager.create_root_block()) return shard_state @@ -607,11 +609,23 @@ def test_xshard_tx_received(self): state0 = create_default_shard_state(env=env0, shard_id=0) state1 = create_default_shard_state(env=env1, shard_id=16) + # Add a root block to allow later minor blocks referencing this root block to + # be broadcasted + root_block = ( + state0.root_tip.create_block_to_append() + .add_minor_block_header(state0.header_tip) + .add_minor_block_header(state1.header_tip) + .finalize() + ) + state0.add_root_block(root_block) + state1.add_root_block(root_block) + # Add one block in shard 0 b0 = state0.create_block_to_mine() state0.finalize_and_add_block(b0) b1 = state1.get_tip().create_block_to_append() + b1.header.hash_prev_root_block = root_block.header.get_hash() tx = create_transfer_transaction( shard_state=state1, key=id1.get_key(), @@ -741,11 +755,23 @@ def test_xshard_for_two_root_blocks(self): state0 = create_default_shard_state(env=env0, shard_id=0) state1 = create_default_shard_state(env=env1, shard_id=1) + # Add a root block to allow later minor blocks referencing this root block to + # be broadcasted + root_block = ( + state0.root_tip.create_block_to_append() + .add_minor_block_header(state0.header_tip) + .add_minor_block_header(state1.header_tip) + .finalize() + ) + state0.add_root_block(root_block) + state1.add_root_block(root_block) + # Add one block in shard 0 b0 = state0.create_block_to_mine() state0.finalize_and_add_block(b0) b1 = state1.get_tip().create_block_to_append() + b1.header.hash_prev_root_block = root_block.header.get_hash() tx = create_transfer_transaction( shard_state=state1, key=id1.get_key(), @@ -756,7 +782,7 @@ def test_xshard_for_two_root_blocks(self): ) b1.add_tx(tx) - # Add a x-shard tx from remote peer + # Add a x-shard tx from state1 state0.add_cross_shard_tx_list_by_minor_block_hash( h=b1.header.get_hash(), tx_list=CrossShardTransactionList( @@ -785,8 +811,9 @@ def test_xshard_for_two_root_blocks(self): state0.finalize_and_add_block(b2) b3 = b1.create_block_to_append() + b3.header.hash_prev_root_block = root_block.header.get_hash() - # Add a x-shard tx from remote peer + # Add a x-shard tx from state1 state0.add_cross_shard_tx_list_by_minor_block_hash( h=b3.header.get_hash(), tx_list=CrossShardTransactionList( @@ -1021,7 +1048,6 @@ def test_shard_state_difficulty(self): env = get_test_env() for shard in env.quark_chain_config.SHARD_LIST: shard.GENESIS.DIFFICULTY = 10000 - GenesisManager.finalize_config(env.quark_chain_config) env.quark_chain_config.SKIP_MINOR_DIFFICULTY_CHECK = False diff_calc = EthDifficultyCalculator( @@ -1094,7 +1120,6 @@ def test_shard_state_recovery_from_root_block(self): state.add_root_block(root_block) recoveredState = ShardState(env=env, shard_id=0) - self.assertEqual(recoveredState.header_tip.height, 0) recoveredState.init_from_root_block(root_block) # forks are pruned diff --git a/quarkchain/cluster/tests/test_utils.py b/quarkchain/cluster/tests/test_utils.py index 068bd2a06..ec2b78764 100644 --- a/quarkchain/cluster/tests/test_utils.py +++ b/quarkchain/cluster/tests/test_utils.py @@ -14,7 +14,8 @@ from quarkchain.db import InMemoryDb from quarkchain.env import DEFAULT_ENV from quarkchain.evm.transactions import Transaction as EvmTransaction -from quarkchain.genesis import GenesisManager +from quarkchain.cluster.shard import Shard +from quarkchain.cluster.shard_state import ShardState from quarkchain.protocol import AbstractConnection from quarkchain.utils import call_async, check @@ -24,6 +25,7 @@ def get_test_env( genesis_quarkash=0, genesis_minor_quarkash=0, shard_size=2, + genesis_root_heights=None, ): env = DEFAULT_ENV.copy() @@ -34,19 +36,24 @@ def get_test_env( env.quark_chain_config.update(shard_size, 1, 1) env.quark_chain_config.TESTNET_MASTER_ADDRESS = genesis_account.serialize().hex() + if genesis_root_heights: + check(len(genesis_root_heights) == shard_size) + for shard_id in range(shard_size): + env.quark_chain_config.SHARD_LIST[ + shard_id + ].GENESIS.ROOT_HEIGHT = genesis_root_heights[shard_id] + # fund genesis account in all shards for i, shard in enumerate(env.quark_chain_config.SHARD_LIST): shard.GENESIS.ALLOC[ genesis_account.address_in_shard(i).serialize().hex() ] = genesis_minor_quarkash - GenesisManager.finalize_config(env.quark_chain_config) - env.quark_chain_config.SKIP_MINOR_DIFFICULTY_CHECK = True env.quark_chain_config.SKIP_ROOT_DIFFICULTY_CHECK = True env.cluster_config.ENABLE_TRANSACTION_HISTORY = True env.cluster_config.DB_PATH_ROOT = "" - assert env.cluster_config.use_mem_db() + check(env.cluster_config.use_mem_db()) return env @@ -158,14 +165,14 @@ def __init__(self, master, slave_list, network, peer): self.network = network self.peer = peer - def get_shard(self, shard_id): + def get_shard(self, shard_id) -> Shard: branch = Branch.create(self.master.env.quark_chain_config.SHARD_SIZE, shard_id) for slave in self.slave_list: if branch in slave.shards: return slave.shards[branch] return None - def get_shard_state(self, shard_id): + def get_shard_state(self, shard_id) -> ShardState: shard = self.get_shard(shard_id) if not shard: return None @@ -184,14 +191,19 @@ def get_next_port(): return port -def create_test_clusters(num_cluster, genesis_account, shard_size, num_slaves): +def create_test_clusters( + num_cluster, genesis_account, shard_size, num_slaves, genesis_root_heights +): bootstrap_port = get_next_port() # first cluster will listen on this port cluster_list = [] loop = asyncio.get_event_loop() for i in range(num_cluster): env = get_test_env( - genesis_account, genesis_minor_quarkash=1000000, shard_size=shard_size + genesis_account, + genesis_minor_quarkash=1000000, + shard_size=shard_size, + genesis_root_heights=genesis_root_heights, ) env.cluster_config.P2P_PORT = bootstrap_port if i == 0 else get_next_port() env.cluster_config.JSON_RPC_PORT = get_next_port() @@ -270,15 +282,21 @@ def __init__( genesis_account=Address.create_empty_account(), shard_size=2, num_slaves=None, + genesis_root_heights=None, ): self.num_cluster = num_cluster self.genesis_account = genesis_account self.shard_size = shard_size self.num_slaves = num_slaves if num_slaves else shard_size + self.genesis_root_heights = genesis_root_heights def __enter__(self): self.cluster_list = create_test_clusters( - self.num_cluster, self.genesis_account, self.shard_size, self.num_slaves + self.num_cluster, + self.genesis_account, + self.shard_size, + self.num_slaves, + self.genesis_root_heights, ) return self.cluster_list diff --git a/quarkchain/cluster/tx_generator.py b/quarkchain/cluster/tx_generator.py index 0d33ed421..0a2589156 100644 --- a/quarkchain/cluster/tx_generator.py +++ b/quarkchain/cluster/tx_generator.py @@ -3,7 +3,7 @@ import time from typing import Optional -from quarkchain.core import Address, Branch, Code, Transaction +from quarkchain.core import Address, Code, Transaction from quarkchain.evm.transactions import Transaction as EvmTransaction from quarkchain.loadtest.accounts import LOADTEST_ACCOUNTS from quarkchain.utils import Logger @@ -22,12 +22,11 @@ def __init__(self, address, key): class TransactionGenerator: - def __init__(self, qkc_config, shard_id, slave_server, branch): + def __init__(self, qkc_config, shard): self.qkc_config = qkc_config - self.shard_id = shard_id - self.slave_server = slave_server + self.shard_id = shard.shard_id + self.shard = shard self.running = False - self.branch = branch self.accounts = [] for item in LOADTEST_ACCOUNTS: @@ -37,10 +36,8 @@ def __init__(self, qkc_config, shard_id, slave_server, branch): self.accounts.append(account) def generate(self, num_tx, x_shard_percent, tx: Transaction): - """Generate a bunch of transactions in the network - The total number of transactions generated each time - """ - if self.running: + """Generate a bunch of transactions in the network """ + if self.running or not self.shard.state.initialized: return False self.running = True @@ -60,17 +57,14 @@ async def __gen(self, num_tx, x_shard_percent, sample_tx: Transaction): total = 0 sample_evm_tx = sample_tx.code.get_evm_transaction() for account in self.accounts: - in_shard_address = Address( - account.address.recipient, self.shard_id - ) - nonce = self.slave_server.get_transaction_count(in_shard_address) + nonce = self.shard.state.get_transaction_count(account.address.recipient) tx = self.create_transaction(account, nonce, x_shard_percent, sample_evm_tx) if not tx: continue tx_list.append(tx) total += 1 if len(tx_list) >= 600 or total >= num_tx: - self.slave_server.shards[self.branch].add_tx_list(tx_list) + self.shard.add_tx_list(tx_list) tx_list = [] await asyncio.sleep( random.uniform(8, 12) diff --git a/quarkchain/config.py b/quarkchain/config.py index c5b6b8cd0..8d599eb5f 100644 --- a/quarkchain/config.py +++ b/quarkchain/config.py @@ -1,5 +1,6 @@ import json from enum import Enum +from typing import List import quarkchain.db import quarkchain.evm.config @@ -80,7 +81,6 @@ class ShardGenesis(BaseConfig): DIFFICULTY = 10000 NONCE = 0 ALLOC = None # dict() hex address -> qkc amount - HASH = None # Block header hash of the genesis block to avoid repeating computation def __init__(self): self.ALLOC = dict() @@ -150,7 +150,8 @@ def to_dict(self): del ret["GENESIS"] else: ret["CONSENSUS_CONFIG"] = self.CONSENSUS_CONFIG.to_dict() - ret["GENESIS"] = self.GENESIS.to_dict() + if self.GENESIS: + ret["GENESIS"] = self.GENESIS.to_dict() return ret @classmethod @@ -159,7 +160,8 @@ def from_dict(cls, d): config.CONSENSUS_TYPE = ConsensusType[config.CONSENSUS_TYPE] if config.CONSENSUS_TYPE in ConsensusType.pow_types(): config.CONSENSUS_CONFIG = POWConfig.from_dict(config.CONSENSUS_CONFIG) - config.GENESIS = ShardGenesis.from_dict(config.GENESIS) + if config.GENESIS: + config.GENESIS = ShardGenesis.from_dict(config.GENESIS) return config @@ -170,7 +172,7 @@ class RootConfig(BaseConfig): CONSENSUS_TYPE = ConsensusType.NONE CONSENSUS_CONFIG = None # Only set when CONSENSUS_TYPE is not NONE - GENESIS = None # ShardGenesis + GENESIS = None # RootGenesis def __init__(self): self.GENESIS = RootGenesis() @@ -242,6 +244,22 @@ def __init__(self): s.CONSENSUS_CONFIG.TARGET_BLOCK_TIME = 3 self.SHARD_LIST.append(s) + def get_genesis_root_height(self, shard_id) -> int: + """ Return the root block height at which the shard shall be created""" + return self.SHARD_LIST[shard_id].GENESIS.ROOT_HEIGHT + + def get_genesis_shard_ids(self) -> List[int]: + """ Return a list of ids for shards that have GENESIS""" + return [i for i, config in enumerate(self.SHARD_LIST) if config.GENESIS] + + def get_initialized_shard_ids_before_root_height(self, root_height) -> List[int]: + """ Return a list of ids of the shards that have been initialized before a certain root height""" + ids = [] + for shard_id, config in enumerate(self.SHARD_LIST): + if config.GENESIS and config.GENESIS.ROOT_HEIGHT < root_height: + ids.append(shard_id) + return ids + @property def testnet_master_address(self): return Address.create_from(self.TESTNET_MASTER_ADDRESS) @@ -262,6 +280,9 @@ def update(self, shard_size, root_block_time, minor_block_time): s.CONSENSUS_TYPE = ConsensusType.POW_SIMULATE s.CONSENSUS_CONFIG = POWConfig() s.CONSENSUS_CONFIG.TARGET_BLOCK_TIME = minor_block_time + s.GENESIS.COINBASE_ADDRESS = ( + Address.create_empty_account(i).serialize().hex() + ) self.SHARD_LIST.append(s) def to_dict(self): diff --git a/quarkchain/genesis.py b/quarkchain/genesis.py index 305bd2df8..d39afed36 100644 --- a/quarkchain/genesis.py +++ b/quarkchain/genesis.py @@ -1,4 +1,6 @@ -from quarkchain.config import QuarkChainConfig, get_default_evm_config +from typing import Optional + +from quarkchain.config import QuarkChainConfig from quarkchain.core import ( Address, MinorBlockMeta, @@ -7,9 +9,8 @@ Branch, ShardInfo, RootBlockHeader, - RootBlock + RootBlock, ) -from quarkchain.evm.config import Env as EvmEnv from quarkchain.evm.state import State as EvmState from quarkchain.utils import sha3_256, check @@ -34,7 +35,9 @@ def create_root_block(self) -> RootBlock: ) return RootBlock(header=header, minor_block_header_list=[]) - def create_minor_block(self, shard_id: int, evm_state: EvmState) -> MinorBlock: + def create_minor_block( + self, root_block: RootBlock, shard_id: int, evm_state: EvmState + ) -> MinorBlock: """ Create genesis block for shard. Genesis block's hash_prev_root_block is set to the genesis root block. Genesis state will be committed to the given evm_state. @@ -63,30 +66,10 @@ def create_minor_block(self, shard_id: int, evm_state: EvmState) -> MinorBlock: height=genesis.HEIGHT, branch=branch, hash_prev_minor_block=bytes.fromhex(genesis.HASH_PREV_MINOR_BLOCK), - hash_prev_root_block=self.create_root_block().header.get_hash(), + hash_prev_root_block=root_block.header.get_hash(), hash_meta=sha3_256(meta.serialize()), coinbase_amount=genesis.COINBASE_AMOUNT, create_time=genesis.TIMESTAMP, difficulty=genesis.DIFFICULTY, ) return MinorBlock(header=header, meta=meta, tx_list=[]) - - def get_minor_block_hash(self, shard_id: int) -> bytes: - return bytes.fromhex(self._qkc_config.SHARD_LIST[shard_id].GENESIS.HASH) - - @staticmethod - def finalize_config(qkc_config: QuarkChainConfig): - """ Fill in genesis block hashes and coinbase addresses""" - manager = GenesisManager(qkc_config) - - evm_config = get_default_evm_config() - evm_config["NETWORK_ID"] = qkc_config.NETWORK_ID - evm_env = EvmEnv(config=evm_config) - for i, shard in enumerate(qkc_config.SHARD_LIST): - evm_state = EvmState(env=evm_env) - shard.GENESIS.COINBASE_ADDRESS = ( - Address.create_empty_account(i).serialize().hex() - ) - shard.GENESIS.HASH = ( - manager.create_minor_block(i, evm_state).header.get_hash().hex() - ) diff --git a/quarkchain/tests/test_config.py b/quarkchain/tests/test_config.py index be6515113..c560d4e86 100644 --- a/quarkchain/tests/test_config.py +++ b/quarkchain/tests/test_config.py @@ -86,8 +86,7 @@ def testBasic(self): "TIMESTAMP": 1519147489, "DIFFICULTY": 10000, "NONCE": 0, - "ALLOC": {}, - "HASH": null + "ALLOC": {} } }, { @@ -107,8 +106,7 @@ def testBasic(self): "TIMESTAMP": 1519147489, "DIFFICULTY": 10000, "NONCE": 0, - "ALLOC": {}, - "HASH": null + "ALLOC": {} } }, { @@ -128,8 +126,7 @@ def testBasic(self): "TIMESTAMP": 1519147489, "DIFFICULTY": 10000, "NONCE": 0, - "ALLOC": {}, - "HASH": null + "ALLOC": {} } }, { @@ -149,8 +146,7 @@ def testBasic(self): "TIMESTAMP": 1519147489, "DIFFICULTY": 10000, "NONCE": 0, - "ALLOC": {}, - "HASH": null + "ALLOC": {} } }, {