Skip to content

Commit

Permalink
Separate pending and effective validator updates. (#2556)
Browse files Browse the repository at this point in the history
* Separate pending and effective validator updates.

- Pending validator updates do not prevent elections from concluding.
- ValidatorElection overrides has_conclude to not conclude when there is a pending update for the matching height.
- Effective validator updates deem past elections inconclusive.

* Problem: Looking for election block is inefficient.

Solution: Record placed elections, update the records upon election conclusion.

* Clarify the conclusion order in Election.process_blocks.

* Insert election records in bulk.

Otherwise, one can significantly slow nodes down by posting a whole bunch of unique elections.

* Change get_election to use find_one.

* Calculate total votes without making extra query.

* Fix the pending valset check.

* Fix election test setup.
  • Loading branch information
ldmberman authored and kansi committed Sep 21, 2018
1 parent 39be7a2 commit 24ca0b3
Show file tree
Hide file tree
Showing 13 changed files with 269 additions and 168 deletions.
23 changes: 16 additions & 7 deletions bigchaindb/backend/localmongodb/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,26 @@ def store_validator_set(conn, validators_update):


@register_query(LocalMongoDBConnection)
def store_election_results(conn, election):
def store_election(conn, election_id, height, is_concluded):
return conn.run(
conn.collection('elections').replace_one(
{'election_id': election['election_id']},
election,
{'election_id': election_id,
'height': height},
{'election_id': election_id,
'height': height,
'is_concluded': is_concluded},
upsert=True,
)
)


@register_query(LocalMongoDBConnection)
def store_elections(conn, elections):
return conn.run(
conn.collection('elections').insert_many(elections)
)


@register_query(LocalMongoDBConnection)
def get_validator_set(conn, height=None):
query = {}
Expand All @@ -312,13 +322,12 @@ def get_validator_set(conn, height=None):
def get_election(conn, election_id):
query = {'election_id': election_id}

cursor = conn.run(
return conn.run(
conn.collection('elections')
.find(query, projection={'_id': False})
.find_one(query, projection={'_id': False},
sort=[('height', DESCENDING)])
)

return next(cursor, None)


@register_query(LocalMongoDBConnection)
def get_asset_tokens_for_public_key(conn, asset_id, public_key):
Expand Down
3 changes: 2 additions & 1 deletion bigchaindb/backend/localmongodb/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@
('commit_id', dict(name='pre_commit_id', unique=True)),
],
'elections': [
('election_id', dict(name='election_id', unique=True)),
([('height', DESCENDING), ('election_id', ASCENDING)],
dict(name='election_id_height', unique=True)),
],
'validators': [
('height', dict(name='height', unique=True)),
Expand Down
13 changes: 10 additions & 3 deletions bigchaindb/backend/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,15 @@ def store_validator_set(conn, validator_update):


@singledispatch
def store_election_results(conn, election):
"""Store election results"""
def store_election(conn, election_id, height, is_concluded):
"""Store election record"""

raise NotImplementedError


@singledispatch
def store_elections(conn, elections):
"""Store election records in bulk"""

raise NotImplementedError

Expand All @@ -369,7 +376,7 @@ def get_validator_set(conn, height):

@singledispatch
def get_election(conn, election_id):
"""Return a validator set change with the specified election_id
"""Return the election record
"""

raise NotImplementedError
Expand Down
9 changes: 4 additions & 5 deletions bigchaindb/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,18 +215,17 @@ def end_block(self, request_end_block):
else:
self.block_txn_hash = block['app_hash']

# Process all concluded elections in the current block and get any update to the validator set
update = Election.approved_elections(self.bigchaindb,
self.new_height,
self.block_transactions)
validator_update = Election.process_block(self.bigchaindb,
self.new_height,
self.block_transactions)

# Store pre-commit state to recover in case there is a crash during `commit`
pre_commit_state = PreCommitState(commit_id=PRE_COMMIT_ID,
height=self.new_height,
transactions=self.block_txn_ids)
logger.debug('Updating PreCommitState: %s', self.new_height)
self.bigchaindb.store_pre_commit_state(pre_commit_state._asdict())
return ResponseEndBlock(validator_updates=update)
return ResponseEndBlock(validator_updates=validator_update)

def commit(self):
"""Store the new height and along with block hash."""
Expand Down
130 changes: 77 additions & 53 deletions bigchaindb/elections/election.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright BigchainDB GmbH and BigchainDB contributors
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0
from collections import defaultdict
from collections import OrderedDict

import base58
from uuid import uuid4
Expand All @@ -22,9 +22,13 @@


class Election(Transaction):
"""Represents election transactions.
To implement a custom election, create a class deriving from this one
with OPERATION set to the election operation, ALLOWED_OPERATIONS
set to (OPERATION,), CREATE set to OPERATION.
"""

# NOTE: this transaction class extends create so the operation inheritance is achieved
# by setting an ELECTION_TYPE and renaming CREATE = ELECTION_TYPE and ALLOWED_OPERATIONS = (ELECTION_TYPE,)
OPERATION = None
# Custom validation schema
TX_SCHEMA_CUSTOM = None
Expand All @@ -34,7 +38,6 @@ class Election(Transaction):
INCONCLUSIVE = 'inconclusive'
# Vote ratio to approve an election
ELECTION_THRESHOLD = 2 / 3
CHANGES_VALIDATOR_SET = True

@classmethod
def get_validator_change(cls, bigchain):
Expand All @@ -45,8 +48,10 @@ def get_validator_change(cls, bigchain):
'validators': <validator_set>
}
"""
height = bigchain.get_latest_block()['height']
return bigchain.get_validator_change(height)
latest_block = bigchain.get_latest_block()
if latest_block is None:
return None
return bigchain.get_validator_change(latest_block['height'])

@classmethod
def get_validators(cls, bigchain, height=None):
Expand Down Expand Up @@ -186,49 +191,52 @@ def get_commited_votes(self, bigchain, election_pk=None):
election_pk))
return self.count_votes(election_pk, txns, dict.get)

def has_concluded(self, bigchain, current_votes=[], height=None):
def has_concluded(self, bigchain, current_votes=[]):
"""Check if the election can be concluded or not.
* Elections can only be concluded if the current validator set
is exactly equal to the validator set encoded in the election outputs.
* Elections can only be concluded if the validator set has not changed
since the election was initiated.
* Elections can be concluded only if the current votes form a supermajority.
Custom elections may override this function and introduce additional checks.
"""
if self.has_validator_set_changed(bigchain):
return False

election_pk = self.to_public_key(self.id)
votes_committed = self.get_commited_votes(bigchain, election_pk)
votes_current = self.count_votes(election_pk, current_votes)
current_validators = self.get_validators(bigchain, height)

if self.is_same_topology(current_validators, self.outputs):
total_votes = sum(current_validators.values())
if (votes_committed < (2/3) * total_votes) and \
(votes_committed + votes_current >= (2/3)*total_votes):
return True
total_votes = sum(output.amount for output in self.outputs)
if (votes_committed < (2/3) * total_votes) and \
(votes_committed + votes_current >= (2/3)*total_votes):
return True

return False

def get_status(self, bigchain):
concluded = self.get_election(self.id, bigchain)
if concluded:
election = self.get_election(self.id, bigchain)
if election and election['is_concluded']:
return self.CONCLUDED

return self.INCONCLUSIVE if self.has_validator_set_changed(bigchain) else self.ONGOING

def has_validator_set_changed(self, bigchain):
latest_change = self.get_validator_change(bigchain)
if latest_change is None:
return False

latest_change_height = latest_change['height']
election_height = bigchain.get_block_containing_tx(self.id)[0]

if latest_change_height >= election_height:
return self.INCONCLUSIVE
else:
return self.ONGOING
election = self.get_election(self.id, bigchain)

return latest_change_height > election['height']

def get_election(self, election_id, bigchain):
result = bigchain.get_election(election_id)
return result
return bigchain.get_election(election_id)

@classmethod
def store_election_results(cls, bigchain, election, height):
bigchain.store_election_results(height, election)
def store(self, bigchain, height, is_concluded):
bigchain.store_election(self.id, height, is_concluded)

def show_election(self, bigchain):
data = self.asset['data']
Expand All @@ -243,45 +251,61 @@ def show_election(self, bigchain):
return response

@classmethod
def approved_elections(cls, bigchain, new_height, txns):
elections = defaultdict(list)
def process_block(cls, bigchain, new_height, txns):
"""Looks for election and vote transactions inside the block, records
and processes elections.
Every election is recorded in the database.
Every vote has a chance to conclude the corresponding election. When
an election is concluded, the corresponding database record is
marked as such.
Elections and votes are processed in the order in which they
appear in the block. Elections are concluded in the order of
appearance of their first votes in the block.
For every election concluded in the block, calls its `on_approval`
method. The returned value of the last `on_approval`, if any,
is a validator set update to be applied in one of the following blocks.
`on_approval` methods are implemented by elections of particular type.
The method may contain side effects but should be idempotent. To account
for other concluded elections, if it requires so, the method should
rely on the database state.
"""
# elections placed in this block
initiated_elections = []
# elections voted for in this block and their votes
elections = OrderedDict()
for tx in txns:
if isinstance(tx, Election):
initiated_elections.append({'election_id': tx.id,
'height': new_height,
'is_concluded': False})
if not isinstance(tx, Vote):
continue
election_id = tx.asset['id']
if election_id not in elections:
elections[election_id] = []
elections[election_id].append(tx)

validator_set_updated = False
validator_set_change = []
if initiated_elections:
bigchain.store_elections(initiated_elections)

validator_update = None
for election_id, votes in elections.items():
election = bigchain.get_transaction(election_id)
if election is None:
continue

if not election.has_concluded(bigchain, votes, new_height):
if not election.has_concluded(bigchain, votes):
continue

if election.makes_validator_set_change():
if validator_set_updated:
continue
validator_set_change.append(election.get_validator_set_change(bigchain, new_height))
validator_set_updated = True

election.on_approval(bigchain, election, new_height)
election.store_election_results(bigchain, election, new_height)

return validator_set_change
validator_update = election.on_approval(bigchain, new_height)
election.store(bigchain, new_height, is_concluded=True)

def makes_validator_set_change(self):
return self.CHANGES_VALIDATOR_SET
return [validator_update] if validator_update else []

def get_validator_set_change(self, bigchain, new_height):
if self.makes_validator_set_change():
return self.change_validator_set(bigchain, new_height)

def change_validator_set(self, bigchain, new_height):
raise NotImplementedError

@classmethod
def on_approval(cls, bigchain, election, new_height):
def on_approval(self, bigchain, new_height):
raise NotImplementedError
16 changes: 7 additions & 9 deletions bigchaindb/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,7 @@ def get_validators(self, height=None):
return [] if result is None else result['validators']

def get_election(self, election_id):
result = backend.query.get_election(self.connection, election_id)
return result
return backend.query.get_election(self.connection, election_id)

def store_pre_commit_state(self, state):
return backend.query.store_pre_commit_state(self.connection, state)
Expand Down Expand Up @@ -481,13 +480,12 @@ def migrate_abci_chain(self):

self.store_abci_chain(block['height'] + 1, new_chain_id, False)

def store_election_results(self, height, election):
"""Store election results
:param height: the block height at which the election concluded
:param election: a concluded election
"""
return backend.query.store_election_results(self.connection, {'height': height,
'election_id': election.id})
def store_election(self, election_id, height, is_concluded):
return backend.query.store_election(self.connection, election_id,
height, is_concluded)

def store_elections(self, elections):
return backend.query.store_elections(self.connection, elections)


Block = namedtuple('Block', ('app_hash', 'height', 'transactions'))
Expand Down
4 changes: 1 addition & 3 deletions bigchaindb/migrations/chain_migration_election.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ class ChainMigrationElection(Election):
CREATE = OPERATION
ALLOWED_OPERATIONS = (OPERATION,)
TX_SCHEMA_CUSTOM = TX_SCHEMA_CHAIN_MIGRATION_ELECTION
CHANGES_VALIDATOR_SET = False

def has_concluded(self, bigchaindb, *args, **kwargs):
chain = bigchaindb.get_latest_abci_chain()
Expand All @@ -19,6 +18,5 @@ def has_concluded(self, bigchaindb, *args, **kwargs):

return super().has_concluded(bigchaindb, *args, **kwargs)

@classmethod
def on_approval(cls, bigchain, election, new_height):
def on_approval(self, bigchain, *args, **kwargs):
bigchain.migrate_abci_chain()
28 changes: 19 additions & 9 deletions bigchaindb/upsert_validator/validator_election.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,28 @@ def validate_schema(cls, tx):
super(ValidatorElection, cls).validate_schema(tx)
validate_asset_public_key(tx['asset']['data']['public_key'])

def change_validator_set(self, bigchain, new_height):
# The new validator set comes into effect from height = new_height+1
# (upcoming changes to Tendermint will change this to height = new_height+2)
def has_concluded(self, bigchain, *args, **kwargs):
latest_block = bigchain.get_latest_block()
if latest_block is not None:
latest_block_height = latest_block['height']
latest_validator_change = bigchain.get_validator_change()['height']

# TODO change to `latest_block_height + 3` when upgrading to Tendermint 0.24.0.
if latest_validator_change == latest_block_height + 2:
# do not conclude the election if there is a change assigned already
return False

return super().has_concluded(bigchain, *args, **kwargs)

def on_approval(self, bigchain, new_height):
validator_updates = [self.asset['data']]
curr_validator_set = bigchain.get_validators(new_height)
updated_validator_set = new_validator_set(curr_validator_set,
validator_updates)

updated_validator_set = [v for v in updated_validator_set if v['voting_power'] > 0]
bigchain.store_validator_set(new_height+1, updated_validator_set)
return encode_validator(self.asset['data'])
updated_validator_set = [v for v in updated_validator_set
if v['voting_power'] > 0]

@classmethod
def on_approval(cls, bigchain, election, new_height):
pass
# TODO change to `new_height + 2` when upgrading to Tendermint 0.24.0.
bigchain.store_validator_set(new_height + 1, updated_validator_set)
return encode_validator(self.asset['data'])

0 comments on commit 24ca0b3

Please sign in to comment.