Skip to content

Commit

Permalink
[ASI-794] Skip tx not block when missing refs (#2295)
Browse files Browse the repository at this point in the history
* Adds model nullable validator util and unit tests
* Adds 'level' column to skipped txs table
* Adds logic to skip tx without raising error when a record to be
inserted is missing required fields
* Adds integration tests for user_replica_set.py
  • Loading branch information
csjiang committed Jan 14, 2022
1 parent b304a2e commit 5b3f0a8
Show file tree
Hide file tree
Showing 19 changed files with 1,291 additions and 274 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""add skipped_transaction level column
Revision ID: f775fb87f5ff
Revises: be27a2794f75
Create Date: 2022-01-12 22:32:24.949547
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "f775fb87f5ff"
down_revision = "be27a2794f75"
branch_labels = None
depends_on = None


def upgrade():
skippedtransactionlevel = sa.Enum("node", "network", name="skippedtransactionlevel")
skippedtransactionlevel.create(op.get_bind())
op.add_column(
"skipped_transactions",
sa.Column(
"level",
sa.Enum("node", "network", name="skippedtransactionlevel"),
nullable=True,
),
)
op.execute("UPDATE skipped_transactions SET level = 'network'")
op.alter_column("skipped_transactions", "level", nullable=False)


def downgrade():
op.drop_column("skipped_transactions", "level")
bind = op.get_bind()
sa.Enum(name="skippedtransactionlevel").drop(bind, checkfirst=False)
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,16 @@ def test_index_operations_indexing_error(celery_app, celery_app_contracts, monke
web3 = celery_app_contracts["web3"]

# Monkeypatch parse track event to raise an exception
# Here it does not matter which part of the indexing flow throws an exception; the expected behavior is the same
def parse_track_event(*_):
raise Exception("Broken parser")

monkeypatch.setattr(src.tasks.tracks, "parse_track_event", parse_track_event)

seed_contract_data(task, celery_app_contracts, web3)

current_block = None
latest_block = None
try:
with db.scoped_session() as session:
# Catch up the indexer
Expand All @@ -238,6 +241,13 @@ def parse_track_event(*_):
assert False
except IndexingError:
error = get_indexing_error(redis)
errored_block_in_db_results = (
session.query(Block).filter_by(number=error["blocknumber"]).all()
) # should not exist
errored_block_in_db = len(errored_block_in_db_results) != 0
# when errored block is in db, it breaks the consensus mechanism
# for discovery nodes staying in sync
assert not errored_block_in_db
assert error["message"] == "Broken parser"
assert error["count"] == 1

Expand Down
166 changes: 165 additions & 1 deletion discovery-provider/integration_tests/tasks/test_index_playlists.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@

from integration_tests.challenges.index_helpers import AttrDict, IPFSClient, UpdateTask
from src.challenges.challenge_event_bus import setup_challenge_bus
from src.tasks.playlists import lookup_playlist_record, parse_playlist_event
from src.models import Block, Playlist, SkippedTransaction, SkippedTransactionLevel
from src.tasks.playlists import (
lookup_playlist_record,
parse_playlist_event,
playlist_state_update,
)
from src.utils import helpers
from src.utils.db_session import get_db
from src.utils.playlist_event_constants import playlist_event_types_lookup
Expand Down Expand Up @@ -333,3 +338,162 @@ def test_index_playlist(app):
session,
)
assert playlist_record.is_delete == True


def test_playlist_indexing_skip_tx(app, mocker):
"""Tests that playlists skip cursed txs without throwing an error and are able to process other tx in block"""
with app.app_context():
db = get_db()
ipfs_client = IPFSClient({})
web3 = Web3()
challenge_event_bus = setup_challenge_bus()
update_task = UpdateTask(ipfs_client, web3, challenge_event_bus)

class TestPlaylistTransaction:
pass

blessed_tx_hash = (
"0x34004dfaf5bb7cf9998eaf387b877d72d198c6508608e309df3f89e57def4db3"
)
blessed_tx = TestPlaylistTransaction()
blessed_tx.transactionHash = update_task.web3.toBytes(hexstr=blessed_tx_hash)
cursed_tx_hash = (
"0x5fe51d735309d3044ae30055ad29101018a1a399066f6c53ea23800225e3a3be"
)
cursed_tx = TestPlaylistTransaction()
cursed_tx.transactionHash = update_task.web3.toBytes(hexstr=cursed_tx_hash)
test_block_number = 25278765
test_block_timestamp = 1
test_block_hash = update_task.web3.toHex(block_hash)
test_playlist_factory_txs = [cursed_tx, blessed_tx]
test_timestamp = datetime.utcfromtimestamp(test_block_timestamp)
blessed_playlist_record = Playlist(
blockhash=test_block_hash,
blocknumber=test_block_number,
txhash=blessed_tx_hash,
playlist_id=91232,
is_album=False,
is_private=False,
playlist_name="test",
playlist_contents={},
playlist_image_multihash=None,
playlist_image_sizes_multihash=None,
description="testing!",
upc=None,
is_current=True,
is_delete=True,
last_added_to=None,
updated_at=test_timestamp,
created_at=test_timestamp,
playlist_owner_id=1,
)
cursed_playlist_record = Playlist(
blockhash=test_block_hash,
blocknumber=test_block_number,
txhash=cursed_tx_hash,
playlist_id=91238,
is_album=None,
is_private=None,
playlist_name=None,
playlist_image_multihash=None,
playlist_image_sizes_multihash=None,
description=None,
upc=None,
is_current=True,
is_delete=True,
last_added_to=None,
updated_at=test_timestamp,
created_at=None,
)

mocker.patch(
"src.tasks.playlists.lookup_playlist_record",
side_effect=[cursed_playlist_record, blessed_playlist_record],
autospec=True,
)
mocker.patch(
"src.tasks.playlists.get_playlist_events_tx",
side_effect=[
[], # no playlist created events
[
{
"args": AttrDict(
{
"_playlistId": cursed_playlist_record.playlist_id,
}
)
},
], # playlist deleted event
[],
[],
[],
[],
[],
[],
[],
[],
[], # second tx receipt
[
{
"args": AttrDict(
{
"_playlistId": blessed_playlist_record.playlist_id,
}
)
},
], # playlist deleted event
[],
[],
[],
[],
[],
[],
[],
[],
],
autospec=True,
)

with db.scoped_session() as session:
try:
current_block = Block(
blockhash=test_block_hash,
parenthash=test_block_hash,
number=test_block_number,
is_current=True,
)
session.add(current_block)
(total_changes, updated_playlist_ids_set) = playlist_state_update(
update_task,
update_task,
session,
test_playlist_factory_txs,
test_block_number,
test_block_timestamp,
block_hash,
)
assert len(updated_playlist_ids_set) == 1
assert (
list(updated_playlist_ids_set)[0] == blessed_playlist_record.playlist_id
)
assert total_changes == 1
assert (
session.query(SkippedTransaction)
.filter(
SkippedTransaction.txhash == cursed_playlist_record.txhash,
SkippedTransaction.level == SkippedTransactionLevel.node,
)
.first()
)
assert (
session.query(Playlist)
.filter(Playlist.playlist_id == blessed_playlist_record.playlist_id)
.first()
)
assert (
session.query(Playlist)
.filter(Playlist.playlist_id == cursed_playlist_record.playlist_id)
.first()
) == None
except Exception:
assert False

0 comments on commit 5b3f0a8

Please sign in to comment.