Skip to content

Commit

Permalink
ABCI chain migration conclusion (#2488)
Browse files Browse the repository at this point in the history
* Problem: No good way to check for val set absence.

Solution: Make get_validator_set/get_validators return None/[] when there are no validators yet.

* Problem: Incompatible ABCI chain upgrades.

Solution: Record known chains and sync through InitChain. Triggering the migration and adjusting other ABCI endpoints will follow.
  • Loading branch information
ldmberman authored and kansi committed Sep 3, 2018
1 parent fe0a4c4 commit 230a5b2
Show file tree
Hide file tree
Showing 13 changed files with 492 additions and 23 deletions.
22 changes: 21 additions & 1 deletion bigchaindb/backend/localmongodb/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ def get_validator_set(conn, height=None):
.limit(1)
)

return list(cursor)[0]
return next(cursor, None)


@register_query(LocalMongoDBConnection)
Expand All @@ -322,3 +322,23 @@ def get_asset_tokens_for_public_key(conn, asset_id, public_key):
{'$project': {'_id': False}}
]))
return cursor


@register_query(LocalMongoDBConnection)
def store_abci_chain(conn, height, chain_id, is_synced=True):
return conn.run(
conn.collection('abci_chains').replace_one(
{'height': height},
{'height': height, 'chain_id': chain_id,
'is_synced': is_synced},
upsert=True,
)
)


@register_query(LocalMongoDBConnection)
def get_latest_abci_chain(conn):
return conn.run(
conn.collection('abci_chains')
.find_one(projection={'_id': False}, sort=[('height', DESCENDING)])
)
14 changes: 14 additions & 0 deletions bigchaindb/backend/localmongodb/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def create_indexes(conn, dbname):
create_utxos_secondary_index(conn, dbname)
create_pre_commit_secondary_index(conn, dbname)
create_validators_secondary_index(conn, dbname)
create_abci_chains_indexes(conn, dbname)


@register_schema(LocalMongoDBConnection)
Expand Down Expand Up @@ -133,3 +134,16 @@ def create_validators_secondary_index(conn, dbname):
conn.conn[dbname]['validators'].create_index('height',
name='height',
unique=True,)


def create_abci_chains_indexes(conn, dbname):
logger.info('Create `abci_chains.height` secondary index.')

conn.conn[dbname]['abci_chains'].create_index('height',
name='height',
unique=True,)

logger.info('Create `abci_chains.chain_id` secondary index.')
conn.conn[dbname]['abci_chains'].create_index('chain_id',
name='chain_id',
unique=True)
20 changes: 20 additions & 0 deletions bigchaindb/backend/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,23 @@ def get_asset_tokens_for_public_key(connection, asset_id,
Iterator of transaction that list given owner in conditions.
"""
raise NotImplementedError


@singledispatch
def store_abci_chain(conn, height, chain_id, is_synced=True):
"""Create or update an ABCI chain at the given height.
Usually invoked in the beginning of the ABCI communications (height=0)
or when ABCI client (like Tendermint) is migrated (any height).
Args:
is_synced: True if the chain is known by both ABCI client and server
"""
raise NotImplementedError


@singledispatch
def get_latest_abci_chain(conn):
"""Returns the ABCI chain stored at the biggest height, if any,
None otherwise.
"""
raise NotImplementedError
2 changes: 1 addition & 1 deletion bigchaindb/backend/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

# Tables/collections that every backend database must create
TABLES = ('transactions', 'blocks', 'assets', 'metadata',
'validators', 'pre_commit', 'utxos')
'validators', 'pre_commit', 'utxos', 'abci_chains')

VALID_LANGUAGES = ('danish', 'dutch', 'english', 'finnish', 'french', 'german',
'hungarian', 'italian', 'norwegian', 'portuguese', 'romanian',
Expand Down
83 changes: 76 additions & 7 deletions bigchaindb/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
with Tendermint.
"""
import logging
import sys

from abci.application import BaseApplication
from abci.types_pb2 import (
Expand Down Expand Up @@ -47,22 +48,76 @@ def __init__(self, bigchaindb=None):
self.block_transactions = []
self.validators = None
self.new_height = None
self.chain = self.bigchaindb.get_latest_abci_chain()

def log_abci_migration_error(self, chain_id, validators):
logger.error(f'An ABCI chain migration is in process. ' +
'Download the new ABCI client and configure it with ' +
'chain_id={chain_id} and validators={validators}.')

def abort_if_abci_chain_is_not_synced(self):
if self.chain is None or self.chain['is_synced']:
return

validators = self.bigchaindb.get_validators()
self.log_abci_migration_error(self.chain['chain_id'], validators)
sys.exit(1)

def init_chain(self, genesis):
"""Initialize chain with block of height 0"""
"""Initialize chain upon genesis or a migration"""

app_hash = ''
height = 0

known_chain = self.bigchaindb.get_latest_abci_chain()
if known_chain is not None:
chain_id = known_chain['chain_id']

if known_chain['is_synced']:
msg = f'Got invalid InitChain ABCI request ({genesis}) - ' + \
'the chain {chain_id} is already synced.'
logger.error(msg)
sys.exit(1)

if chain_id != genesis.chain_id:
validators = self.bigchaindb.get_validators()
self.log_abci_migration_error(chain_id, validators)
sys.exit(1)

# set migration values for app hash and height
block = self.bigchaindb.get_latest_block()
app_hash = '' if block is None else block['app_hash']
height = 0 if block is None else block['height'] + 1

known_validators = self.bigchaindb.get_validators()
validator_set = [vutils.decode_validator(v)
for v in genesis.validators]

if known_validators and known_validators != validator_set:
self.log_abci_migration_error(known_chain['chain_id'],
known_validators)
sys.exit(1)

validator_set = [vutils.decode_validator(v) for v in genesis.validators]
block = Block(app_hash='', height=0, transactions=[])
block = Block(app_hash=app_hash, height=height, transactions=[])
self.bigchaindb.store_block(block._asdict())
self.bigchaindb.store_validator_set(1, validator_set, None)
self.bigchaindb.store_validator_set(height + 1, validator_set, None)
abci_chain_height = 0 if known_chain is None else known_chain['height']
self.bigchaindb.store_abci_chain(abci_chain_height,
genesis.chain_id, True)
self.chain = {'height': abci_chain_height, 'is_synced': True,
'chain_id': genesis.chain_id}
return ResponseInitChain()

def info(self, request):
"""Return height of the latest committed block."""

self.abort_if_abci_chain_is_not_synced()

r = ResponseInfo()
block = self.bigchaindb.get_latest_block()
if block:
r.last_block_height = block['height']
chain_shift = 0 if self.chain is None else self.chain['height']
r.last_block_height = block['height'] - chain_shift
r.last_block_app_hash = block['app_hash'].encode('utf-8')
else:
r.last_block_height = 0
Expand All @@ -77,6 +132,8 @@ def check_tx(self, raw_transaction):
raw_tx: a raw string (in bytes) transaction.
"""

self.abort_if_abci_chain_is_not_synced()

logger.benchmark('CHECK_TX_INIT')
logger.debug('check_tx: %s', raw_transaction)
transaction = decode_transaction(raw_transaction)
Expand All @@ -95,8 +152,11 @@ def begin_block(self, req_begin_block):
req_begin_block: block object which contains block header
and block hash.
"""
self.abort_if_abci_chain_is_not_synced()

chain_shift = 0 if self.chain is None else self.chain['height']
logger.benchmark('BEGIN BLOCK, height:%s, num_txs:%s',
req_begin_block.header.height,
req_begin_block.header.height + chain_shift,
req_begin_block.header.num_txs)

self.block_txn_ids = []
Expand All @@ -109,6 +169,9 @@ def deliver_tx(self, raw_transaction):
Args:
raw_tx: a raw string (in bytes) transaction.
"""

self.abort_if_abci_chain_is_not_synced()

logger.debug('deliver_tx: %s', raw_transaction)
transaction = self.bigchaindb.is_valid_transaction(
decode_transaction(raw_transaction), self.block_transactions)
Expand All @@ -130,7 +193,11 @@ def end_block(self, request_end_block):
height (int): new height of the chain.
"""

height = request_end_block.height
self.abort_if_abci_chain_is_not_synced()

chain_shift = 0 if self.chain is None else self.chain['height']

height = request_end_block.height + chain_shift
self.new_height = height
block_txn_hash = calculate_hash(self.block_txn_ids)
block = self.bigchaindb.get_latest_block()
Expand Down Expand Up @@ -158,6 +225,8 @@ def end_block(self, request_end_block):
def commit(self):
"""Store the new height and along with block hash."""

self.abort_if_abci_chain_is_not_synced()

data = self.block_txn_hash.encode('utf-8')

# register a new block only when new transactions are received
Expand Down
34 changes: 32 additions & 2 deletions bigchaindb/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,7 @@ def get_validator_change(self, height=None):

def get_validators(self, height=None):
result = self.get_validator_change(height)
validators = result['validators']
return validators
return [] if result is None else result['validators']

def get_validators_by_election_id(self, election_id):
result = backend.query.get_validator_set_by_election_id(self.connection, election_id)
Expand All @@ -448,6 +447,37 @@ def store_validator_set(self, height, validators, election_id):
'validators': validators,
'election_id': election_id})

def store_abci_chain(self, height, chain_id, is_synced=True):
return backend.query.store_abci_chain(self.connection, height,
chain_id, is_synced)

def get_latest_abci_chain(self):
return backend.query.get_latest_abci_chain(self.connection)

def migrate_abci_chain(self):
"""Generate and record a new ABCI chain ID. New blocks are not
accepted until we receive an InitChain ABCI request with
the matching chain ID and validator set.
Chain ID is generated based on the current chain and height.
`chain-X` => `chain-X-migrated-at-height-5`.
`chain-X-migrated-at-height-5` => `chain-X-migrated-at-height-21`.
If there is no known chain (we are at genesis), the function returns.
"""
latest_chain = self.get_latest_abci_chain()
if latest_chain is None:
return

block = self.get_latest_block()

suffix = '-migrated-at-height-'
chain_id = latest_chain['chain_id']
block_height_str = str(block['height'])
new_chain_id = chain_id.split(suffix)[0] + suffix + block_height_str

self.store_abci_chain(block['height'] + 1, new_chain_id, False)


Block = namedtuple('Block', ('app_hash', 'height', 'transactions'))

Expand Down
51 changes: 51 additions & 0 deletions tests/backend/localmongodb/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import pytest
import pymongo

from bigchaindb.backend import connect, query


pytestmark = [pytest.mark.tendermint, pytest.mark.bdb]


Expand Down Expand Up @@ -394,3 +397,51 @@ def gen_validator_update(height):

v91 = query.get_validator_set(conn)
assert v91['height'] == 91


@pytest.mark.parametrize('description,stores,expected', [
(
'Query empty database.',
[],
None,
),
(
'Store one chain with the default value for `is_synced`.',
[
{'height': 0, 'chain_id': 'some-id'},
],
{'height': 0, 'chain_id': 'some-id', 'is_synced': True},
),
(
'Store one chain with a custom value for `is_synced`.',
[
{'height': 0, 'chain_id': 'some-id', 'is_synced': False},
],
{'height': 0, 'chain_id': 'some-id', 'is_synced': False},
),
(
'Store one chain, then update it.',
[
{'height': 0, 'chain_id': 'some-id', 'is_synced': True},
{'height': 0, 'chain_id': 'new-id', 'is_synced': False},
],
{'height': 0, 'chain_id': 'new-id', 'is_synced': False},
),
(
'Store a chain, update it, store another chain.',
[
{'height': 0, 'chain_id': 'some-id', 'is_synced': True},
{'height': 0, 'chain_id': 'some-id', 'is_synced': False},
{'height': 10, 'chain_id': 'another-id', 'is_synced': True},
],
{'height': 10, 'chain_id': 'another-id', 'is_synced': True},
),
])
def test_store_abci_chain(description, stores, expected):
conn = connect()

for store in stores:
query.store_abci_chain(conn, **store)

actual = query.get_latest_abci_chain(conn)
assert expected == actual, description
8 changes: 6 additions & 2 deletions tests/backend/localmongodb/test_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_init_creates_db_tables_and_indexes():
collection_names = conn.conn[dbname].collection_names()
assert set(collection_names) == {
'transactions', 'assets', 'metadata', 'blocks', 'utxos', 'pre_commit',
'validators'
'validators', 'abci_chains',
}

indexes = conn.conn[dbname]['assets'].index_information().keys()
Expand All @@ -46,6 +46,9 @@ def test_init_creates_db_tables_and_indexes():
indexes = conn.conn[dbname]['validators'].index_information().keys()
assert set(indexes) == {'_id_', 'height'}

indexes = conn.conn[dbname]['abci_chains'].index_information().keys()
assert set(indexes) == {'_id_', 'height', 'chain_id'}


def test_init_database_fails_if_db_exists():
import bigchaindb
Expand Down Expand Up @@ -79,7 +82,8 @@ def test_create_tables():
collection_names = conn.conn[dbname].collection_names()
assert set(collection_names) == {
'transactions', 'assets', 'metadata', 'blocks', 'utxos', 'validators',
'pre_commit'}
'pre_commit', 'abci_chains',
}


def test_create_secondary_indexes():
Expand Down
3 changes: 2 additions & 1 deletion tests/tendermint/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def validator_pub_key():
@pytest.fixture
def init_chain_request():
addr = codecs.decode(b'9FD479C869C7D7E7605BF99293457AA5D80C3033', 'hex')
pk = codecs.decode(b'VAgFZtYw8bNR5TMZHFOBDWk9cAmEu3/c6JgRBmddbbI=', 'base64')
pk = codecs.decode(b'VAgFZtYw8bNR5TMZHFOBDWk9cAmEu3/c6JgRBmddbbI=',
'base64')
val_a = types.Validator(address=addr, power=10,
pub_key=types.PubKey(type='ed25519', data=pk))

Expand Down

0 comments on commit 230a5b2

Please sign in to comment.