-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PAY-664] Backfill audio_tx_history userbank indexing #4242
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General approach looks good to me here! A few questions –
- How do we know what the backfill stop slot is? That will be different for each DN, depending on when it upgrades to the version with the new indexers, right?
- How do we know when the backfiller is done? We probably want some sort of healthcheck here
Also, let's make sure we have a linear for deleting the backfillers once this is out and nodes are all caught up 🙏
@@ -17,6 +17,7 @@ trending_refresh_seconds = 3600 | |||
infra_setup = | |||
indexing_transaction_index_sort_order_start_block = | |||
get_users_cnode_ttl_sec = 5 | |||
backfill_stop_sig = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a single backfill stop signature, or is it different for each indexer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just pushed changes that'll lookup a sig for each indexer individually. But fwiw I'm pretty sure I observed it working with having the same sig for all 3.
for tx in reward_manager_txs | ||
if tx["transfer_instruction"] is not None | ||
] | ||
user_challenges = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to be careful here not to re-create these old user_challenges
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're just querying them here, think I removed the code that inserts into the challenges db.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of creating three new tables, maybe you can utilize the indexing_checkpoints
table and add a new column there for signature?
discovery-provider/alembic/versions/93fc0b994aba_spl_token_backfill.py
Outdated
Show resolved
Hide resolved
op.drop_table("user_bank_backfill_txs") | ||
op.drop_index( | ||
op.f("idx_user_bank_backfill_txs_slot"), | ||
table_name="user_bank_backfill_txs", | ||
info={"if_exists": True}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe drop table actually drops related indexes, but can leave in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated_at = Column( | ||
DateTime, nullable=False, default=func.now(), onupdate=func.now() | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for this to have updated_at while rewards manager and user bank backfill do not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto below, just copied code from index_spl_token.py
latest_sol_user_bank_backfill_slot_key = "latest_sol_slot:user_bank_backfill" | ||
latest_sol_aggregate_tips_slot_key = "latest_sol_slot:aggregate_tips" | ||
latest_sol_plays_slot_key = "latest_sol_slot:plays" | ||
latest_sol_rewards_manager_slot_key = "latest_sol_slot:rewards_manager" | ||
latest_sol_rewards_manager_backfill_slot_key = ( | ||
"latest_sol_slot:rewards_manager_backfill" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we also want latest_sol_slot:spl_token_backfill
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for some reason index_spl_token.py
does it differently, uses a redis queue with its own key defined in the file. And we copied over the logic to this new backfilling task so just gonna follow that pattern.
page_count = 0 | ||
|
||
# Traverse recent records until an intersection is found with latest slot | ||
while not intersection_found: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the use for the intersection_found if we are only backfilling - it's used in the current flows to find the point from which we move forward from, but we should be moving backwards in this job - if my understanding is correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I image you're thinking with this is that you're going back to the very beginning and reindexing from there, which works, but we can just go backwards, lmk if this doesn't make sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synced with @jowlee, his solution is definitely better but since we're on a time crunch I will leave well enough alone for now.
record = session.query(SPLTokenBackfillTransaction).first() | ||
if record: | ||
record.last_scanned_slot = last_scanned_slot | ||
record.signature = last_scanned_signature | ||
else: | ||
record = SPLTokenBackfillTransaction( | ||
last_scanned_slot=last_scanned_slot, | ||
signature=last_scanned_signature, | ||
) | ||
session.add(record) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you making a new record here. in the query below, we call .first with no order by so it won't guarantee the highest/lowest backfill tx
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto below.
highest_slot_query = session.query( | ||
SPLTokenBackfillTransaction.last_scanned_slot | ||
).first() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we're creating multiple rows in this tbl, and there's no order by here, do it's not guaranteed to be the highest slot.
Also, I'm not sure this is what you want for a backfilling job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think actually this table only ever has one row and we update it with the latest processed slot.
tx_sig_db_count = ( | ||
session.query(UserBankBackfillTx).filter(UserBankBackfillTx.signature == tx_sig) | ||
).count() | ||
exists = tx_sig_db_count > 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of .count
you can probably use .exists() or .first() which might be slightly more performant, not sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM assuming we've addressed @jowlee's points - have one question about the backfill stop slot
8b009d3
to
748a40c
Compare
|
||
def get_backfill_health_info(): | ||
db = db_session.get_db_read_replica() | ||
solana_client_manager = SolanaClientManager(shared_config["solana"]["endpoint"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This breaks with this error, can anyone tell me why?
{"levelno": 40, "level": "ERROR", "msg": "Non Audius-derived exception\nTraceback (most recent call last):\n File \"/usr/lib/python3.9/site-packages/flask/app.py\", line 1838, in full_dispatch_request\n rv = self.dispatch_request()\n File \"/usr/lib/python3.9/site-packages/flask/app.py\", line 1824, in dispatch_request\n return self.view_functions[rule.endpoint](**req.view_args)\n File \"/audius-discovery-provider/src/queries/health_check.py\", line 58, in health_check\n (health_results, error) = get_health(args)\n File \"/audius-discovery-provider/src/queries/get_health.py\", line 207, in get_health\n backfill_health_info = get_backfill_health_info()\n File \"/audius-discovery-provider/src/queries/get_health.py\", line 688, in get_backfill_health_info\n user_bank = index_user_bank_backfill_health_check(\n File \"/audius-discovery-provider/src/tasks/index_user_bank_backfill.py\", line 514, in check_if_backfilling_complete\n one_sig_before_stop = solana_client_manager.get_signatures_for_address(\n File \"/audius-discovery-provider/src/solana/solana_client_manager.py\", line 111, in get_signatures_for_address\n return _try_all_with_timeout(\n File \"/audius-discovery-provider/src/solana/solana_client_manager.py\", line 204, in _try_all_with_timeout\n raise Exception(message)\nException: solana_client_manager.py | get_signatures_for_address | All requests failed", "timestamp": "2022-11-03 09:02:18,185", "pathname": "/audius-discovery-provider/src/app.py", "funcName": "handle_exception", "lineno": 303, "service": "server", "otelSpanID": "e3d7ae7fe1391fcc", "otelTraceID": "08a3b436ad5a28ea017f206b5e66503e", "otelServiceName": "discovery-provider"}
748a40c
to
b449193
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
amazing
b449193
to
9e60614
Compare
Synced with Joe and need to merge!
Description
Backfill user bank transaction data into
audio_transactions_history
by making a duplicates ofindex_user_bank.py
that stops at the transaction whose signature matches thebackfill_stop_sig
arg.The following commits do the same for
index_rewards_manager.py
andindex_spl_token.py
.Tests
Tested on remote-dev that is indexing prod. Set
backfill_stop_sig
to a known signature and observed the backfilling occurring.Monitoring - How will this change be monitored? Are there sufficient logs / alerts?
Adds logs for the backfilling indexer.