Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core/196/handle backlog deletes #337

Closed
wants to merge 12 commits into from
95 changes: 92 additions & 3 deletions bigchaindb/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,16 @@ def validate_transactions(self):

# poison pill
if tx == 'stop':
self.q_tx_delete.put('stop')
self.q_tx_validated.put('stop')
return

self.q_tx_delete.put(tx['id'])

with self.monitor.timer('validate_transaction', rate=bigchaindb.config['statsd']['rate']):
is_valid_transaction = b.is_valid_transaction(tx)

if is_valid_transaction:
self.q_tx_validated.put(tx)
else:
self.q_tx_delete.put(tx['id'])

def create_blocks(self):
"""
Expand Down Expand Up @@ -126,11 +125,15 @@ def write_blocks(self):

# poison pill
if block == 'stop':
self.q_tx_delete.put('stop')
return

with self.monitor.timer('write_block'):
b.write_block(block)

for tx in block['block']['transactions']:
self.q_tx_delete.put(tx['id'])

def delete_transactions(self):
"""
Delete transactions from the backlog
Expand Down Expand Up @@ -229,3 +232,89 @@ def _start(self):
p_write.start()
p_delete.start()


class BacklogDeleteRevert(Block):

def __init__(self, q_backlog_delete):
# invalid transactions can stay deleted
self.q_tx_to_validate = q_backlog_delete
self.q_tx_validated = mp.Queue()
self.q_transaction_to_revert = mp.Queue()
self.q_tx_delete = mp.Queue()

self.monitor = Monitor()

def locate_transactions(self):
"""
Determine if a deleted transaction has made it into a block
"""
# create bigchain instance
b = Bigchain()

while True:
tx = self.q_tx_validated.get()

# poison pill
if tx == 'stop':
self.q_tx_delete.put('stop')
self.q_transaction_to_revert.put('stop')
return

# check if tx is in a (valid) block
validity = b.get_blocks_status_containing_tx(tx['id'])

if validity and list(validity.values()).count(Bigchain.BLOCK_VALID) == 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work like this. There is some period of time since a block is created (and transactions are deleted) and the block is voted valid.

Example timeline:

  1. Block1 is createad with tx1 / tx1 is deleted
  2. Receive the delete change for tx1
  3. Block is not valid yet / re-insert tx1
  4. Block1 is voted valid
  5. Block2 is created with tx1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a tunable delay with the latest commit that should fix the problem. The default is 30s. Can you confirm this fixes the problem? If so I will add the delay to the configurable settings, documentation, etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the validity of the block? This seems to be doing overlapping work with #193

The way I see it there are 3 different possibilities:

  1. if a transaction was deleted but made it into a block regardless of validity, its ok (if the block is later invalid Transactions from an invalid block should be put back into the backlog #193 should take care of the rest).
  2. If the transaction did not made it into a block and is invalid, its ok
  3. If the transaction did not made it into a block and is valid, revert

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only meant to check the validity of transactions -- am I missing something big here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is true.
I think that it would be better to instead of subclassing Block just duplicate the validate_transaction in BacklogDeleteRevert.

This way we don't need to add extra functionality to empty the delete queue and it becomes easier to read the code. Looking at the code it took me some time to figure out how the transactions went from q_tx_to_validate to q_tx_validated

# tx made it into a block, and can safely be deleted
self.q_tx_delete.put(tx['id'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to store the transactions here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As opposed to just the txid? No I don't think so, I will make the changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was more what do we do with q_tx_delete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's there because BacklogDeleteRevert is a derived class of block, so I didn't have to rewrite the validate_transactions. It expects a queue to take invalid transactions, but yes it's functionally useless.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not be a problem. You never start the delete_transactions process

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes, my mistake...but couldn't the queue fill up and start throwing exceptions?

else:
# valid tx not in any block, should be re-inserted into backlog
self.q_transaction_to_revert.put(tx)

def revert_deletes(self):
"""
Put an incorrectly deleted transaction back in the backlog
"""
# create bigchain instance
b = Bigchain()

while True:
tx = self.q_transaction_to_revert.get()

# poison pill
if tx == 'stop':
return

b.write_transaction(tx)

def empty_delete_q(self):
"""
Empty the delete queue
"""

while True:
txid = self.q_tx_delete.get()

# poison pill
if txid == 'stop':
return

def kill(self):
for i in range(mp.cpu_count()):
self.q_tx_to_validate.put('stop')

def start(self):
"""
Initialize, spawn, and start the processes
"""

# initialize the processes
p_validate = ProcessGroup(name='validate_transactions', target=self.validate_transactions)
p_locate = ProcessGroup(name='locate_transactions', target=self.locate_transactions)
p_revert = ProcessGroup(name='revert_deletes', target=self.revert_deletes)
p_empty_delete_q = ProcessGroup(name='empty_delete_q', target=self.empty_delete_q)

# start the processes
p_validate.start()
p_locate.start()
p_revert.start()
p_empty_delete_q.start()
7 changes: 5 additions & 2 deletions bigchaindb/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import bigchaindb
from bigchaindb import Bigchain
from bigchaindb.voter import Voter, Election
from bigchaindb.block import Block
from bigchaindb.block import Block, BacklogDeleteRevert
from bigchaindb.web import server


Expand All @@ -32,6 +32,7 @@ def __init__(self):
self.q_new_block = mp.Queue()
self.q_new_transaction = mp.Queue()
self.q_block_new_vote = mp.Queue()
self.q_backlog_delete = mp.Queue()

def map_backlog(self):
# listen to changes on the backlog and redirect the changes
Expand All @@ -48,7 +49,7 @@ def map_backlog(self):

# delete
if change['new_val'] is None:
pass
self.q_backlog_delete.put(change['old_val'])

# update
if change['new_val'] is not None and change['old_val'] is not None:
Expand Down Expand Up @@ -92,6 +93,7 @@ def start(self):
p_block = mp.Process(name='block', target=block.start)
p_voter = Voter(self.q_new_block)
p_election = Election(self.q_block_new_vote)
p_backlog_delete = BacklogDeleteRevert(self.q_backlog_delete)

# start the processes
logger.info('starting bigchain mapper')
Expand All @@ -100,6 +102,7 @@ def start(self):
p_map_backlog.start()
logger.info('starting block')
p_block.start()
p_backlog_delete.start()

logger.info('starting voter')
p_voter.start()
Expand Down
68 changes: 67 additions & 1 deletion tests/db/test_bigchain_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from bigchaindb import exceptions
from bigchaindb import crypto
from bigchaindb.voter import Voter
from bigchaindb.block import Block
from bigchaindb.block import Block, BacklogDeleteRevert


@pytest.mark.skipif(reason='Some tests throw a ResourceWarning that might result in some weird '
Expand Down Expand Up @@ -639,6 +639,11 @@ def test_validate_transactions(self, b, user_vk):

# check if the number of valid transactions
assert block.q_tx_validated.qsize() - 1 == count_valid
assert block.q_tx_delete.qsize() == 100 - count_valid

block.create_blocks()
block.write_blocks()

assert block.q_tx_delete.qsize() - 1 == 100

def test_create_block(self, b, user_vk):
Expand Down Expand Up @@ -717,6 +722,67 @@ def test_delete_transactions(self, b, user_vk):
# check if all transactions were deleted from the backlog
assert r.table('backlog').count() == 0

def test_revert_delete_transactions(self, b, user_vk):
# make sure that there are no transactions in the backlog
r.table('backlog').delete().run(b.conn)

# create transactions and randomly invalidate some of them by changing the hash
q_transactions = mp.Queue()
count_valid = 0
for i in range(100):
valid = random.choice([True, False])
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
if not valid:
tx['id'] = 'a' * 64
else:
count_valid += 1
q_transactions.put(tx)

# this is like a changefeed of deleted transactions
reverter = BacklogDeleteRevert(q_transactions)

reverter.start()
time.sleep(5)
reverter.kill()

# only valid transactions should make it back to the backlog
assert r.table('backlog').count().run(b.conn) == count_valid

# make sure that there are no transactions in the backlog
r.table('backlog').delete().run(b.conn)
genesis = b.create_genesis_block()

# make a block
txs = [b.create_transaction(b.me, user_vk, None, 'CREATE') for i in range(100)]
txs = [b.sign_transaction(tx, b.me_private) for tx in txs]
block = b.create_block(txs)

# write a block and vote it valid
b.write_block(block, durability='hard')
vote = b.vote(block, genesis['id'], True)
b.write_vote(block, vote, 1)

q_transactions = mp.Queue()
for tx in txs:
q_transactions.put(tx)

# put some transactions in the queue that aren't in a block
for i in range(50):
tx = b.create_transaction(b.me, user_vk, None, 'CREATE')
tx = b.sign_transaction(tx, b.me_private)
q_transactions.put(tx)

# this is like a changefeed of deleted transactions
reverter = BacklogDeleteRevert(q_transactions)

reverter.start()
time.sleep(5)
reverter.kill()

# only the 50 transactions not in a block should make it
assert r.table('backlog').count().run(b.conn) == 50

def test_bootstrap(self, b, user_vk):
# make sure that there are no transactions in the backlog
r.table('backlog').delete().run(b.conn)
Expand Down