Skip to content

Commit

Permalink
Check backfillers progress (#4321)
Browse files Browse the repository at this point in the history
* Hotfix index_spl_token_backfill.py

* Check backfill progress in get_health

* Move to verbose, fix unit test

* fix import bug
  • Loading branch information
dharit-tan committed Nov 8, 2022
1 parent 1c078bf commit 56158f5
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 5 deletions.
25 changes: 24 additions & 1 deletion discovery-provider/src/queries/get_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,19 @@
from src.queries.get_sol_rewards_manager import get_sol_rewards_manager_health_info
from src.queries.get_sol_user_bank import get_sol_user_bank_health_info
from src.queries.get_spl_audio import get_spl_audio_health_info
from src.tasks.index_rewards_manager_backfill import (
check_progress as rewards_manager_backfill_check_progress,
)
from src.tasks.index_rewards_manager_backfill import (
index_rewards_manager_backfill_complete,
)
from src.tasks.index_spl_token_backfill import (
check_progress as spl_token_backfill_check_progress,
)
from src.tasks.index_spl_token_backfill import index_spl_token_backfill_complete
from src.tasks.index_user_bank_backfill import (
check_progress as user_bank_backfill_check_progress,
)
from src.tasks.index_user_bank_backfill import index_user_bank_backfill_complete
from src.utils import db_session, helpers, redis_connection, web3_provider
from src.utils.config import shared_config
Expand Down Expand Up @@ -338,7 +347,7 @@ def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict,
"latest_block_num": latest_block_num,
"latest_indexed_block_num": latest_indexed_block_num,
"transactions_history_backfill": {
"index_user_backfilling_complete": is_index_user_bank_backfill_complete,
"user_bank_backfilling_complete": is_index_user_bank_backfill_complete,
"rewards_manager_backfilling_complete": is_index_rewards_manager_backfill_complete,
"spl_token_backfilling_complete": is_index_spl_token_backfill_complete,
},
Expand Down Expand Up @@ -405,6 +414,20 @@ def get_health(args: GetHealthArgs, use_redis_cache: bool = True) -> Tuple[Dict,

health_results["tables"] = table_size_info_json

db = db_session.get_db_read_replica()
with db.scoped_session() as session:
spl_token_backfill_progress = spl_token_backfill_check_progress(session)
rewards_manager_backfill_progress = rewards_manager_backfill_check_progress(
session
)
user_bank_backfill_progress = user_bank_backfill_check_progress(session)

health_results["transactions_history_backfill_progress"] = {
"user_bank_backfilling_progress": user_bank_backfill_progress,
"rewards_manager_backfilling_progress": rewards_manager_backfill_progress,
"spl_token_backfilling_progress": spl_token_backfill_progress,
}

unhealthy_blocks = bool(
enforce_block_diff and block_difference > healthy_block_diff
)
Expand Down
2 changes: 2 additions & 0 deletions discovery-provider/src/queries/get_health_unit_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from hexbytes import HexBytes
from src.models.indexing.block import Block
from src.models.indexing.indexing_checkpoints import IndexingCheckpoint
from src.queries.get_health import get_health
from src.utils.redis_constants import (
challenges_last_processed_event_redis_key,
Expand Down Expand Up @@ -469,6 +470,7 @@ def get_block(_u1, _u2): # unused
is_current=True,
)
)
IndexingCheckpoint.__table__.create(db_mock._engine)

args = {"verbose": True}
health_results, error = get_health(args)
Expand Down
48 changes: 46 additions & 2 deletions discovery-provider/src/tasks/index_rewards_manager_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import logging
import time
from decimal import Decimal
from typing import Callable, Dict, List, Optional, TypedDict
from typing import Any, Callable, Dict, List, Optional, TypedDict

import base58
from redis import Redis
from sqlalchemy import asc, desc, or_
from sqlalchemy import and_, asc, desc, or_
from sqlalchemy.orm.session import Session
from src.models.indexing.indexing_checkpoints import IndexingCheckpoint
from src.models.rewards.challenge import Challenge, ChallengeType
Expand Down Expand Up @@ -493,6 +493,50 @@ def get_transaction_signatures(
return transaction_signatures


def check_progress(session: Session):
stop_row = (
session.query(IndexingCheckpoint)
.filter(
IndexingCheckpoint.tablename == index_rewards_manager_backfill_tablename
)
.first()
)
if not stop_row:
return None
ret: Any = {}
ret["stop_slot"] = stop_row.last_checkpoint
ret["stop_sig"] = stop_row.signature
latest_processed_row = (
session.query(RewardsManagerBackfillTransaction)
.order_by(desc(RewardsManagerBackfillTransaction.slot))
.first()
)
if not latest_processed_row:
return ret
ret["latest_processed_sig"] = latest_processed_row.signature
ret["latest_processed_slot"] = latest_processed_row.slot
min_row = (
session.query(AudioTransactionsHistory)
.filter(
and_(
or_(
AudioTransactionsHistory.transaction_type
== TransactionType.user_reward,
AudioTransactionsHistory.transaction_type
== TransactionType.trending_reward,
),
AudioTransactionsHistory.slot < ret["stop_slot"],
)
)
.order_by(asc(AudioTransactionsHistory.slot))
).first()
if not min_row:
return ret
ret["min_slot"] = min_row.slot
ret["min_sig"] = min_row.signature
return ret


def process_transaction_signatures(
solana_client_manager: SolanaClientManager,
db: SessionManager,
Expand Down
46 changes: 45 additions & 1 deletion discovery-provider/src/tasks/index_spl_token_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import base58
from redis import Redis
from solana.publickey import PublicKey
from sqlalchemy import and_, asc, or_
from sqlalchemy import and_, asc, desc, or_
from sqlalchemy.orm.session import Session
from src.models.indexing.indexing_checkpoints import IndexingCheckpoint
from src.models.indexing.spl_token_backfill_transaction import (
Expand Down Expand Up @@ -571,6 +571,50 @@ def find_true_stop_sig(
return stop_sig


def check_progress(session: Session):
stop_row = (
session.query(IndexingCheckpoint)
.filter(IndexingCheckpoint.tablename == index_spl_token_backfill_tablename)
.first()
)
if not stop_row:
return None
ret: Any = {}
ret["stop_slot"] = stop_row.last_checkpoint
ret["stop_sig"] = stop_row.signature
latest_processed_row = (
session.query(SPLTokenBackfillTransaction)
.order_by(desc(SPLTokenBackfillTransaction.last_scanned_slot))
.first()
)
if not latest_processed_row:
return ret
ret["latest_processed_sig"] = latest_processed_row.signature
ret["latest_processed_slot"] = latest_processed_row.last_scanned_slot
min_row = (
session.query(AudioTransactionsHistory)
.filter(
and_(
or_(
AudioTransactionsHistory.transaction_type.in_(purchase_types),
and_(
AudioTransactionsHistory.transaction_type
== TransactionType.transfer,
AudioTransactionsHistory.method == TransactionMethod.receive,
),
),
AudioTransactionsHistory.slot < ret["stop_slot"],
)
)
.order_by(asc(AudioTransactionsHistory.slot))
).first()
if not min_row:
return ret
ret["min_slot"] = min_row.slot
ret["min_sig"] = min_row.signature
return ret


@celery.task(name="index_spl_token_backfill", bind=True)
@save_duration_metric(metric_group="celery_task")
def index_spl_token_backfill(self):
Expand Down
46 changes: 45 additions & 1 deletion discovery-provider/src/tasks/index_user_bank_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import re
import time
from decimal import Decimal
from typing import List, Optional, TypedDict
from typing import Any, List, Optional, TypedDict

from redis import Redis
from solana.publickey import PublicKey
Expand Down Expand Up @@ -603,6 +603,50 @@ def find_true_stop_sig(
return stop_sig


def check_progress(session: Session):
stop_row = (
session.query(IndexingCheckpoint)
.filter(IndexingCheckpoint.tablename == index_user_bank_backfill_tablename)
.first()
)
if not stop_row:
return None
ret: Any = {}
ret["stop_slot"] = stop_row.last_checkpoint
ret["stop_sig"] = stop_row.signature
latest_processed_row = (
session.query(UserBankBackfillTx)
.order_by(desc(UserBankBackfillTx.slot))
.first()
)
if not latest_processed_row:
return ret
ret["latest_processed_sig"] = latest_processed_row.signature
ret["latest_processed_slot"] = latest_processed_row.slot
min_row = (
session.query(AudioTransactionsHistory)
.filter(
and_(
or_(
AudioTransactionsHistory.transaction_type == TransactionType.tip,
and_(
AudioTransactionsHistory.transaction_type
== TransactionType.transfer,
AudioTransactionsHistory.method == TransactionMethod.send,
),
),
AudioTransactionsHistory.slot < ret["stop_slot"],
)
)
.order_by(asc(AudioTransactionsHistory.slot))
).first()
if not min_row:
return ret
ret["min_slot"] = min_row.slot
ret["min_sig"] = min_row.signature
return ret


# ####### CELERY TASKS ####### #
@celery.task(name="index_user_bank_backfill", bind=True)
@save_duration_metric(metric_group="celery_task")
Expand Down

0 comments on commit 56158f5

Please sign in to comment.